You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by mp...@apache.org on 2017/07/05 10:54:09 UTC

[1/4] mesos git commit: Performed validation/normalization of `Resource`s before authorization.

Repository: mesos
Updated Branches:
  refs/heads/master 21de4b671 -> 2617482f0


Performed validation/normalization of `Resource`s before authorization.

Review: https://reviews.apache.org/r/60564


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e097f211
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e097f211
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e097f211

Branch: refs/heads/master
Commit: e097f21124d16c80544af7aabc83bab5fb712453
Parents: 63c8b1d
Author: Michael Park <mp...@apache.org>
Authored: Thu Jun 29 23:49:10 2017 -0700
Committer: Michael Park <mp...@apache.org>
Committed: Wed Jul 5 03:41:37 2017 -0700

----------------------------------------------------------------------
 src/master/master.cpp      | 205 ++++++++++++++++++++++------------------
 src/tests/master_tests.cpp | 101 ++++++++++++++++++++
 2 files changed, 214 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/e097f211/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index da0a13b..56b170e 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -3925,6 +3925,85 @@ void Master::accept(
     return;
   }
 
+  // Validate and upgrade all of the resources in `accept.operations`.
+  //
+  // If a RESERVE, UNRESERVE, CREATE, or DESTROY operation
+  // contains invalid resources, we just drop the operation.
+  //
+  // If a LAUNCH or LAUNCH_GROUP operation contains invalid
+  // resources, we send a TASK_ERROR status update per task.
+  {
+    // We move out the `accept.operations`, and re-insert the operations
+    // with the resources validated and upgraded.
+    RepeatedPtrField<Offer::Operation> operations = accept.operations();
+    accept.clear_operations();
+
+    foreach (Offer::Operation& operation, operations) {
+      Option<Error> error = validateAndNormalizeResources(&operation);
+      if (error.isSome()) {
+        // We send TASK_ERROR status updates for tasks in an invalid LAUNCH
+        // and LAUNCH_GROUP operations. Note that we don't need to recover
+        // the resources here because we always continue onto `_accept`
+        // which recovers the unused resources at the end.
+        // TODO(mpark): Consider pulling this out in a more reusable manner.
+        auto sendStatusUpdates = [&](
+            const RepeatedPtrField<TaskInfo>& tasks,
+            TaskStatus::Reason reason) {
+          foreach (const TaskInfo& task, tasks) {
+            const StatusUpdate& update = protobuf::createStatusUpdate(
+                framework->id(),
+                task.slave_id(),
+                task.task_id(),
+                TASK_ERROR,
+                TaskStatus::SOURCE_MASTER,
+                None(),
+                error->message,
+                reason);
+
+            metrics->tasks_error++;
+
+            metrics->incrementTasksStates(
+                TASK_ERROR, TaskStatus::SOURCE_MASTER, reason);
+
+            forward(update, UPID(), framework);
+          }
+        };
+
+        switch (operation.type()) {
+          case Offer::Operation::RESERVE:
+          case Offer::Operation::UNRESERVE:
+          case Offer::Operation::CREATE:
+          case Offer::Operation::DESTROY: {
+            drop(framework, operation, error->message);
+            break;
+          }
+          case Offer::Operation::LAUNCH: {
+            sendStatusUpdates(
+                operation.launch().task_infos(),
+                TaskStatus::REASON_TASK_INVALID);
+
+            break;
+          }
+          case Offer::Operation::LAUNCH_GROUP: {
+            sendStatusUpdates(
+                operation.launch_group().task_group().tasks(),
+                TaskStatus::REASON_TASK_GROUP_INVALID);
+
+            break;
+          }
+          case Offer::Operation::UNKNOWN: {
+            LOG(WARNING) << "Ignoring unknown offer operation";
+            break;
+          }
+        }
+
+        continue;
+      }
+
+      accept.add_operations()->CopyFrom(operation);
+    }
+  }
+
   // We make various adjustments to the `Offer::Operation`s,
   // typically for backward/forward compatibility.
   // TODO(mpark): Pull this out to a master normalization utility.
@@ -4258,9 +4337,7 @@ void Master::_accept(
   CHECK_READY(_authorizations);
   list<Future<bool>> authorizations = _authorizations.get();
 
-  // We iterate by copy here since we call `validateAndUpgradeResources`
-  // on it which modifies the `Operation`.
-  foreach (Offer::Operation operation, accept.operations()) {
+  foreach (const Offer::Operation& operation, accept.operations()) {
     switch (operation.type()) {
       // The RESERVE operation allows a principal to reserve resources.
       case Offer::Operation::RESERVE: {
@@ -4287,21 +4364,16 @@ void Master::_accept(
           continue;
         }
 
-        Option<Error> error = validateAndUpgradeResources(
-            operation.mutable_reserve()->mutable_resources());
-
-        if (error.isNone()) {
-          Option<Principal> principal = framework->info.has_principal()
-            ? Principal(framework->info.principal())
-            : Option<Principal>::none();
+        Option<Principal> principal = framework->info.has_principal()
+                                        ? Principal(framework->info.principal())
+                                        : Option<Principal>::none();
 
-          // Make sure this reserve operation is valid.
-          error = validation::operation::validate(
-              operation.reserve(),
-              principal,
-              slave->capabilities,
-              framework->info);
-        }
+        // Make sure this reserve operation is valid.
+        Option<Error> error = validation::operation::validate(
+            operation.reserve(),
+            principal,
+            slave->capabilities,
+            framework->info);
 
         if (error.isSome()) {
           drop(
@@ -4357,13 +4429,9 @@ void Master::_accept(
           continue;
         }
 
-        Option<Error> error = validateAndUpgradeResources(
-            operation.mutable_unreserve()->mutable_resources());
-
-        if (error.isNone()) {
-          // Make sure this unreserve operation is valid.
-          error = validation::operation::validate(operation.unreserve());
-        }
+        // Make sure this unreserve operation is valid.
+        Option<Error> error =
+          validation::operation::validate(operation.unreserve());
 
         if (error.isSome()) {
           drop(framework, operation, error->message);
@@ -4415,22 +4483,17 @@ void Master::_accept(
           continue;
         }
 
-        Option<Error> error = validateAndUpgradeResources(
-            operation.mutable_create()->mutable_volumes());
-
-        if (error.isNone()) {
-          Option<Principal> principal = framework->info.has_principal()
-            ? Principal(framework->info.principal())
-            : Option<Principal>::none();
-
-          // Make sure this create operation is valid.
-          error = validation::operation::validate(
-              operation.create(),
-              slave->checkpointedResources,
-              principal,
-              slave->capabilities,
-              framework->info);
-        }
+        Option<Principal> principal = framework->info.has_principal()
+                                        ? Principal(framework->info.principal())
+                                        : Option<Principal>::none();
+
+        // Make sure this create operation is valid.
+        Option<Error> error = validation::operation::validate(
+            operation.create(),
+            slave->checkpointedResources,
+            principal,
+            slave->capabilities,
+            framework->info);
 
         if (error.isSome()) {
           drop(
@@ -4485,17 +4548,12 @@ void Master::_accept(
           continue;
         }
 
-        Option<Error> error = validateAndUpgradeResources(
-            operation.mutable_destroy()->mutable_volumes());
-
-        if (error.isNone()) {
-          // Make sure this destroy operation is valid.
-          error = validation::operation::validate(
-              operation.destroy(),
-              slave->checkpointedResources,
-              slave->usedResources,
-              slave->pendingTasks);
-        }
+        // Make sure this destroy operation is valid.
+        Option<Error> error = validation::operation::validate(
+            operation.destroy(),
+            slave->checkpointedResources,
+            slave->usedResources,
+            slave->pendingTasks);
 
         if (error.isSome()) {
           drop(framework, operation, error->message);
@@ -4553,8 +4611,7 @@ void Master::_accept(
         Offer::Operation _operation;
         _operation.set_type(Offer::Operation::LAUNCH);
 
-        foreach (
-            TaskInfo& task, *operation.mutable_launch()->mutable_task_infos()) {
+        foreach (const TaskInfo& task, operation.launch().task_infos()) {
           Future<bool> authorization = authorizations.front();
           authorizations.pop_front();
 
@@ -4625,17 +4682,7 @@ void Master::_accept(
             _offeredResources.nonShared() + offeredSharedResources;
 
           Option<Error> error =
-            validateAndUpgradeResources(task.mutable_resources());
-
-          if (error.isNone() && task.has_executor()) {
-            error = validateAndUpgradeResources(
-                task.mutable_executor()->mutable_resources());
-          }
-
-          if (error.isNone()) {
-            error =
-              validation::task::validate(task, framework, slave, available);
-          }
+            validation::task::validate(task, framework, slave, available);
 
           if (error.isSome()) {
             const StatusUpdate& update = protobuf::createStatusUpdate(
@@ -4743,30 +4790,6 @@ void Master::_accept(
           }
         }
 
-        Offer::Operation::LaunchGroup* launchGroup =
-          operation.mutable_launch_group();
-
-        Option<Error> error;
-
-        if (launchGroup->has_executor()) {
-          error = validateAndUpgradeResources(
-              launchGroup->mutable_executor()->mutable_resources());
-        }
-
-        foreach (
-            TaskInfo& task,
-            *launchGroup->mutable_task_group()->mutable_tasks()) {
-          if (error.isSome()) {
-            break;
-          }
-
-          error = validateAndUpgradeResources(task.mutable_resources());
-          if (error.isNone() && task.has_executor()) {
-            error = validateAndUpgradeResources(
-                task.mutable_executor()->mutable_resources());
-          }
-        }
-
         // Note that we do not fill in the `ExecutorInfo.framework_id`
         // since we do not have to support backwards compatibility like
         // in the `Launch` operation case.
@@ -4781,10 +4804,8 @@ void Master::_accept(
         // TODO(anindya_sinha): If task group uses shared resources, this
         // validation needs to be enhanced to accommodate multiple copies
         // of shared resources across tasks within the task group.
-        if (error.isNone()) {
-          error = validation::task::group::validate(
-              taskGroup, executor, framework, slave, _offeredResources);
-        }
+        Option<Error> error = validation::task::group::validate(
+            taskGroup, executor, framework, slave, _offeredResources);
 
         Option<TaskStatus::Reason> reason = None();
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/e097f211/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index d21194f..f03f119 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -7675,6 +7675,107 @@ TEST_P(MasterTestPrePostReservationRefinement,
   driver.join();
 }
 
+
+// This test verifies that hitting the `/state` endpoint before '_accept()'
+// is called results in pending tasks being reported correctly.
+TEST_P(MasterTestPrePostReservationRefinement, StateEndpointPendingTasks)
+{
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_role(DEFAULT_TEST_ROLE);
+
+  // TODO(mpark): Remove this once `RESERVATION_REFINEMENT`
+  // is removed from `DEFAULT_FRAMEWORK_INFO`.
+  frameworkInfo.clear_capabilities();
+
+  if (GetParam()) {
+    frameworkInfo.add_capabilities()->set_type(
+        FrameworkInfo::Capability::RESERVATION_REFINEMENT);
+  }
+
+  MockAuthorizer authorizer;
+  Try<Owned<cluster::Master>> master = StartMaster(&authorizer);
+  ASSERT_SOME(master);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get());
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  ASSERT_NE(0u, offers->size());
+
+  Offer offer = offers->front();
+
+  TaskInfo task;
+  task.set_name("");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->MergeFrom(offer.slave_id());
+  task.mutable_resources()->MergeFrom(offer.resources());
+  task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
+
+  // Return a pending future from authorizer.
+  Future<Nothing> authorize;
+  Promise<bool> promise;
+  EXPECT_CALL(authorizer, authorized(_))
+    .WillOnce(DoAll(FutureSatisfy(&authorize),
+                    Return(promise.future())));
+
+  driver.launchTasks(offer.id(), {task});
+
+  // Wait until authorization is in progress.
+  AWAIT_READY(authorize);
+
+  Future<Response> response = process::http::get(
+      master.get()->pid,
+      "state",
+      None(),
+      createBasicAuthHeaders(DEFAULT_CREDENTIAL));
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+
+  Try<JSON::Object> parse = JSON::parse<JSON::Object>(response->body);
+  ASSERT_SOME(parse);
+
+  JSON::Value result = parse.get();
+
+  JSON::Object expected = {
+    {
+      "frameworks",
+      JSON::Array {
+        JSON::Object {
+          {
+            "tasks",
+            JSON::Array {
+              JSON::Object {
+                { "id", "1" },
+                { "role", frameworkInfo.role() },
+                { "state", "TASK_STAGING" }
+              }
+            }
+          }
+        }
+      }
+    }
+  };
+
+  EXPECT_TRUE(result.contains(expected));
+
+  driver.stop();
+  driver.join();
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {


[2/4] mesos git commit: Updated `validateAndNormalizeResources` to operate on `Operation`s.

Posted by mp...@apache.org.
Updated `validateAndNormalizeResources` to operate on `Operation`s.

Review: https://reviews.apache.org/r/60563


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/63c8b1d6
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/63c8b1d6
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/63c8b1d6

Branch: refs/heads/master
Commit: 63c8b1d63be363e96d7c2672d9d65b9d67ca48ed
Parents: 710b721
Author: Michael Park <mp...@apache.org>
Authored: Thu Jun 29 23:30:16 2017 -0700
Committer: Michael Park <mp...@apache.org>
Committed: Wed Jul 5 03:41:37 2017 -0700

----------------------------------------------------------------------
 src/common/resources_utils.cpp | 213 ++++++++++++++++++++++++++++++++++--
 src/common/resources_utils.hpp |  15 ++-
 2 files changed, 215 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/63c8b1d6/src/common/resources_utils.cpp
----------------------------------------------------------------------
diff --git a/src/common/resources_utils.cpp b/src/common/resources_utils.cpp
index 3a6a578..821bd09 100644
--- a/src/common/resources_utils.cpp
+++ b/src/common/resources_utils.cpp
@@ -196,16 +196,215 @@ void convertResourceFormat(
 }
 
 
-Option<Error> validateAndUpgradeResources(RepeatedPtrField<Resource>* resources)
+Option<Error> validateAndNormalizeResources(Offer::Operation* operation)
 {
-  Option<Error> error = Resources::validate(*resources);
-  if (error.isSome()) {
-    return Error("Invalid resources upgrade: " + error->message);
-  }
+  CHECK_NOTNULL(operation);
+
+  switch (operation->type()) {
+    case Offer::Operation::RESERVE: {
+      // TODO(mpark): Once we perform a sanity check validation for
+      // offer operations as specified in MESOS-7760, this should no
+      // longer have to be handled in this function.
+      if (!operation->has_reserve()) {
+        return Error(
+            "A RESERVE offer operation must have"
+            " the Offer.Operation.reserve field set.");
+      }
+
+      Option<Error> error =
+        Resources::validate(operation->reserve().resources());
+
+      if (error.isSome()) {
+        return error;
+      }
+
+      convertResourceFormat(
+          operation->mutable_reserve()->mutable_resources(),
+          POST_RESERVATION_REFINEMENT);
+
+      return None();
+    }
+    case Offer::Operation::UNRESERVE: {
+      // TODO(mpark): Once we perform a sanity check validation for
+      // offer operations as specified in MESOS-7760, this should no
+      // longer have to be handled in this function.
+      if (!operation->has_unreserve()) {
+        return Error(
+            "An UNRESERVE offer operation must have"
+            " the Offer.Operation.unreserve field set.");
+      }
+
+      Option<Error> error =
+        Resources::validate(operation->unreserve().resources());
+
+      if (error.isSome()) {
+        return error;
+      }
+
+      convertResourceFormat(
+          operation->mutable_unreserve()->mutable_resources(),
+          POST_RESERVATION_REFINEMENT);
+
+      return None();
+    }
+    case Offer::Operation::CREATE: {
+      // TODO(mpark): Once we perform a sanity check validation for
+      // offer operations as specified in MESOS-7760, this should no
+      // longer have to be handled in this function.
+      if (!operation->has_create()) {
+        return Error(
+            "A CREATE offer operation must have"
+            " the Offer.Operation.create field set.");
+      }
+
+      Option<Error> error =
+        Resources::validate(operation->create().volumes());
+
+      if (error.isSome()) {
+        return error;
+      }
+
+      convertResourceFormat(
+          operation->mutable_create()->mutable_volumes(),
+          POST_RESERVATION_REFINEMENT);
+
+      return None();
+    }
+    case Offer::Operation::DESTROY: {
+      // TODO(mpark): Once we perform a sanity check validation for
+      // offer operations as specified in MESOS-7760, this should no
+      // longer have to be handled in this function.
+      if (!operation->has_destroy()) {
+        return Error(
+            "A DESTROY offer operation must have"
+            " the Offer.Operation.destroy field set.");
+      }
+
+      Option<Error> error =
+        Resources::validate(operation->destroy().volumes());
+
+      if (error.isSome()) {
+        return error;
+      }
+
+      convertResourceFormat(
+          operation->mutable_destroy()->mutable_volumes(),
+          POST_RESERVATION_REFINEMENT);
+
+      return None();
+    }
+    case Offer::Operation::LAUNCH: {
+      // TODO(mpark): Once we perform a sanity check validation for
+      // offer operations as specified in MESOS-7760, this should no
+      // longer have to be handled in this function.
+      if (!operation->has_launch()) {
+        return Error(
+            "A LAUNCH offer operation must have"
+            " the Offer.Operation.launch field set.");
+      }
+
+      // Validate resources in LAUNCH.
+      foreach (const TaskInfo& task, operation->launch().task_infos()) {
+        Option<Error> error = Resources::validate(task.resources());
+        if (error.isSome()) {
+          return error;
+        }
 
-  convertResourceFormat(resources, POST_RESERVATION_REFINEMENT);
+        if (task.has_executor()) {
+          Option<Error> error =
+            Resources::validate(task.executor().resources());
 
-  return None();
+          if (error.isSome()) {
+            return error;
+          }
+        }
+      }
+
+      // Normalize resources in LAUNCH.
+      foreach (
+          TaskInfo& task, *operation->mutable_launch()->mutable_task_infos()) {
+        convertResourceFormat(
+            task.mutable_resources(),
+            POST_RESERVATION_REFINEMENT);
+
+        if (task.has_executor()) {
+          convertResourceFormat(
+              task.mutable_executor()->mutable_resources(),
+              POST_RESERVATION_REFINEMENT);
+        }
+      }
+
+      return None();
+    }
+    case Offer::Operation::LAUNCH_GROUP: {
+      // TODO(mpark): Once we perform a sanity check validation for
+      // offer operations as specified in MESOS-7760, this should no
+      // longer have to be handled in this function.
+      if (!operation->has_launch_group()) {
+        return Error(
+            "A LAUNCH_GROUP offer operation must have"
+            " the Offer.Operation.launch_group field set.");
+      }
+
+      Offer::Operation::LaunchGroup* launchGroup =
+        operation->mutable_launch_group();
+
+      // Validate resources in LAUNCH_GROUP.
+      if (launchGroup->has_executor()) {
+        Option<Error> error =
+          Resources::validate(launchGroup->executor().resources());
+
+        if (error.isSome()) {
+          return error;
+        }
+      }
+
+      foreach (const TaskInfo& task, launchGroup->task_group().tasks()) {
+        Option<Error> error = Resources::validate(task.resources());
+        if (error.isSome()) {
+          return error;
+        }
+
+        if (task.has_executor()) {
+          Option<Error> error =
+            Resources::validate(task.executor().resources());
+
+          if (error.isSome()) {
+            return error;
+          }
+        }
+      }
+
+      // Normalize resources in LAUNCH_GROUP.
+      if (launchGroup->has_executor()) {
+        convertResourceFormat(
+            launchGroup->mutable_executor()->mutable_resources(),
+            POST_RESERVATION_REFINEMENT);
+      }
+
+      foreach (
+          TaskInfo& task, *launchGroup->mutable_task_group()->mutable_tasks()) {
+        convertResourceFormat(
+            task.mutable_resources(),
+            POST_RESERVATION_REFINEMENT);
+
+        if (task.has_executor()) {
+          convertResourceFormat(
+              task.mutable_executor()->mutable_resources(),
+              POST_RESERVATION_REFINEMENT);
+        }
+      }
+
+      return None();
+    }
+    case Offer::Operation::UNKNOWN: {
+      // TODO(mpark): Once we perform a sanity check validation for
+      // offer operations as specified in MESOS-7760, this should no
+      // longer have to be handled in this function.
+      return Error("Unknown offer operation");
+    }
+  }
+  UNREACHABLE();
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/63c8b1d6/src/common/resources_utils.hpp
----------------------------------------------------------------------
diff --git a/src/common/resources_utils.hpp b/src/common/resources_utils.hpp
index 7128297..18e3d9d 100644
--- a/src/common/resources_utils.hpp
+++ b/src/common/resources_utils.hpp
@@ -132,12 +132,15 @@ void convertResourceFormat(
     ResourceFormat format);
 
 
-// Convert the given resources to the "post-reservation-refinement" format
-// from any format ("pre-", "post-" or "endpoint") if all of the resources
-// are valid. Returns an `Error` if there are any invalid resources present;
-// in this case, the resources are left unchanged.
-Option<Error> validateAndUpgradeResources(
-    google::protobuf::RepeatedPtrField<Resource>* resources);
+// Convert the resources in the given `Operation` to the
+// "post-reservation-refinement" format from any format
+// ("pre-", "post-" or "endpoint") if all of the resources are valid.
+// Returns an `Error` if there are any invalid resources present;
+// in this case, all resources are left unchanged.
+// NOTE: The validate and upgrade steps are bundled because currently
+// it would be an error to validate but not upgrade or to upgrade
+// without validating.
+Option<Error> validateAndNormalizeResources(Offer::Operation* operation);
 
 
 // Convert the given resources to the "pre-reservation-refinement" format


[4/4] mesos git commit: Validated and normalized resources on the V1 operator API path.

Posted by mp...@apache.org.
Validated and normalized resources on the V1 operator API path.

Before, the validation / normalization of resources were incorrectly
only performed on the V0 operator API. This patch performs them also
on the V1 operator API by changing the common codepath taken by V0
and V1. We also update the V0 codepath to defer the construction of
`Resources` until we get to the common codepath.

Review: https://reviews.apache.org/r/60566


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2617482f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2617482f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2617482f

Branch: refs/heads/master
Commit: 2617482f0b2b446dfa4b42c85ce5356fefe4f0b7
Parents: e097f21
Author: Michael Park <mp...@apache.org>
Authored: Fri Jun 30 02:37:10 2017 -0700
Committer: Michael Park <mp...@apache.org>
Committed: Wed Jul 5 03:42:58 2017 -0700

----------------------------------------------------------------------
 src/master/http.cpp                             | 112 +++++++-------
 src/master/master.hpp                           |   4 +-
 src/tests/master_tests.cpp                      | 145 +++++++++++++++++++
 src/tests/persistent_volume_endpoints_tests.cpp |   8 +-
 src/tests/reservation_endpoints_tests.cpp       |  10 +-
 5 files changed, 206 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/2617482f/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 64b7cdd..175a44c 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -1080,7 +1080,7 @@ Future<Response> Master::Http::createVolumes(
         parse.error());
   }
 
-  Resources volumes;
+  RepeatedPtrField<Resource> volumes;
   foreach (const JSON::Value& value, parse.get().values) {
     Try<Resource> volume = ::protobuf::parse<Resource>(value);
     if (volume.isError()) {
@@ -1089,16 +1089,7 @@ Future<Response> Master::Http::createVolumes(
           volume.error());
     }
 
-    // Since the `+=` operator will silently drop invalid resources, we validate
-    // each resource individually.
-    Option<Error> error = Resources::validate(volume.get());
-    if (error.isSome()) {
-      return BadRequest(error.get().message);
-    }
-
-    convertResourceFormat(&volume.get(), POST_RESERVATION_REFINEMENT);
-
-    volumes += volume.get();
+    volumes.Add()->CopyFrom(volume.get());
   }
 
   return _createVolumes(slaveId, volumes, principal);
@@ -1120,16 +1111,21 @@ Future<Response> Master::Http::_createVolumes(
   operation.set_type(Offer::Operation::CREATE);
   operation.mutable_create()->mutable_volumes()->CopyFrom(volumes);
 
-  Option<Error> validate = validation::operation::validate(
+  Option<Error> error = validateAndNormalizeResources(&operation);
+  if (error.isSome()) {
+    return BadRequest(error->message);
+  }
+
+  error = validation::operation::validate(
       operation.create(),
       slave->checkpointedResources,
       principal,
       slave->capabilities);
 
-  if (validate.isSome()) {
+  if (error.isSome()) {
     return BadRequest(
         "Invalid CREATE operation on agent " + stringify(*slave) + ": " +
-        validate.get().message);
+        error->message);
   }
 
   return master->authorizeCreateVolume(operation.create(), principal)
@@ -1141,7 +1137,8 @@ Future<Response> Master::Http::_createVolumes(
       // The resources required for this operation are equivalent to the
       // volumes specified by the user minus any DiskInfo (DiskInfo will
       // be created when this operation is applied).
-      return _operation(slaveId, removeDiskInfos(volumes), operation);
+      return _operation(
+          slaveId, removeDiskInfos(operation.create().volumes()), operation);
     }));
 }
 
@@ -1257,7 +1254,7 @@ Future<Response> Master::Http::destroyVolumes(
         parse.error());
   }
 
-  Resources volumes;
+  RepeatedPtrField<Resource> volumes;
   foreach (const JSON::Value& value, parse.get().values) {
     Try<Resource> volume = ::protobuf::parse<Resource>(value);
     if (volume.isError()) {
@@ -1266,16 +1263,7 @@ Future<Response> Master::Http::destroyVolumes(
           volume.error());
     }
 
-    // Since the `+=` operator will silently drop invalid resources, we validate
-    // each resource individually.
-    Option<Error> error = Resources::validate(volume.get());
-    if (error.isSome()) {
-      return BadRequest(error.get().message);
-    }
-
-    convertResourceFormat(&volume.get(), POST_RESERVATION_REFINEMENT);
-
-    volumes += volume.get();
+    volumes.Add()->CopyFrom(volume.get());
   }
 
   return _destroyVolumes(slaveId, volumes, principal);
@@ -1297,14 +1285,19 @@ Future<Response> Master::Http::_destroyVolumes(
   operation.set_type(Offer::Operation::DESTROY);
   operation.mutable_destroy()->mutable_volumes()->CopyFrom(volumes);
 
-  Option<Error> validate = validation::operation::validate(
+  Option<Error> error = validateAndNormalizeResources(&operation);
+  if (error.isSome()) {
+    return BadRequest(error->message);
+  }
+
+  error = validation::operation::validate(
       operation.destroy(),
       slave->checkpointedResources,
       slave->usedResources,
       slave->pendingTasks);
 
-  if (validate.isSome()) {
-    return BadRequest("Invalid DESTROY operation: " + validate.get().message);
+  if (error.isSome()) {
+    return BadRequest("Invalid DESTROY operation: " + error->message);
   }
 
   return master->authorizeDestroyVolume(operation.destroy(), principal)
@@ -1313,7 +1306,7 @@ Future<Response> Master::Http::_destroyVolumes(
         return Forbidden();
       }
 
-      return _operation(slaveId, volumes, operation);
+      return _operation(slaveId, operation.destroy().volumes(), operation);
     }));
 }
 
@@ -2227,7 +2220,7 @@ Future<Response> Master::Http::reserve(
         parse.error());
   }
 
-  Resources resources;
+  RepeatedPtrField<Resource> resources;
   foreach (const JSON::Value& value, parse.get().values) {
     Try<Resource> resource = ::protobuf::parse<Resource>(value);
     if (resource.isError()) {
@@ -2236,16 +2229,7 @@ Future<Response> Master::Http::reserve(
           resource.error());
     }
 
-    // Since the `+=` operator will silently drop invalid resources, we validate
-    // each resource individually.
-    Option<Error> error = Resources::validate(resource.get());
-    if (error.isSome()) {
-      return BadRequest(error.get().message);
-    }
-
-    convertResourceFormat(&resource.get(), POST_RESERVATION_REFINEMENT);
-
-    resources += resource.get();
+    resources.Add()->CopyFrom(resource.get());
   }
 
   return _reserve(slaveId, resources, principal);
@@ -2254,7 +2238,7 @@ Future<Response> Master::Http::reserve(
 
 Future<Response> Master::Http::_reserve(
     const SlaveID& slaveId,
-    const Resources& resources,
+    const RepeatedPtrField<Resource>& resources,
     const Option<Principal>& principal) const
 {
   Slave* slave = master->slaves.registered.get(slaveId);
@@ -2267,13 +2251,18 @@ Future<Response> Master::Http::_reserve(
   operation.set_type(Offer::Operation::RESERVE);
   operation.mutable_reserve()->mutable_resources()->CopyFrom(resources);
 
-  Option<Error> error = validation::operation::validate(
+  Option<Error> error = validateAndNormalizeResources(&operation);
+  if (error.isSome()) {
+    return BadRequest(error->message);
+  }
+
+  error = validation::operation::validate(
       operation.reserve(), principal, slave->capabilities);
 
   if (error.isSome()) {
     return BadRequest(
         "Invalid RESERVE operation on agent " + stringify(*slave) + ": " +
-        error.get().message);
+        error->message);
   }
 
   return master->authorizeReserveResources(operation.reserve(), principal)
@@ -2284,7 +2273,8 @@ Future<Response> Master::Http::_reserve(
 
       // We only allow "pushing" a single reservation at a time, so we require
       // the resources with one reservation "popped" to be present on the agent.
-      Resources required = resources.popReservation();
+      Resources required =
+        Resources(operation.reserve().resources()).popReservation();
 
       return _operation(slaveId, required, operation);
     }));
@@ -2299,7 +2289,8 @@ Future<Response> Master::Http::reserveResources(
   CHECK_EQ(mesos::master::Call::RESERVE_RESOURCES, call.type());
 
   const SlaveID& slaveId = call.reserve_resources().slave_id();
-  const Resources& resources = call.reserve_resources().resources();
+  const RepeatedPtrField<Resource>& resources =
+    call.reserve_resources().resources();
 
   return _reserve(slaveId, resources, principal);
 }
@@ -4995,7 +4986,7 @@ Future<Response> Master::Http::unreserve(
         parse.error());
   }
 
-  Resources resources;
+  RepeatedPtrField<Resource> resources;
   foreach (const JSON::Value& value, parse.get().values) {
     Try<Resource> resource = ::protobuf::parse<Resource>(value);
     if (resource.isError()) {
@@ -5004,16 +4995,7 @@ Future<Response> Master::Http::unreserve(
           resource.error());
     }
 
-    // Since the `+=` operator will silently drop invalid resources, we validate
-    // each resource individually.
-    Option<Error> error = Resources::validate(resource.get());
-    if (error.isSome()) {
-      return BadRequest(error.get().message);
-    }
-
-    convertResourceFormat(&resource.get(), POST_RESERVATION_REFINEMENT);
-
-    resources += resource.get();
+    resources.Add()->CopyFrom(resource.get());
   }
 
   return _unreserve(slaveId, resources, principal);
@@ -5022,7 +5004,7 @@ Future<Response> Master::Http::unreserve(
 
 Future<Response> Master::Http::_unreserve(
     const SlaveID& slaveId,
-    const Resources& resources,
+    const RepeatedPtrField<Resource>& resources,
     const Option<Principal>& principal) const
 {
   Slave* slave = master->slaves.registered.get(slaveId);
@@ -5035,11 +5017,14 @@ Future<Response> Master::Http::_unreserve(
   operation.set_type(Offer::Operation::UNRESERVE);
   operation.mutable_unreserve()->mutable_resources()->CopyFrom(resources);
 
-  Option<Error> error = validation::operation::validate(operation.unreserve());
+  Option<Error> error = validateAndNormalizeResources(&operation);
+  if (error.isSome()) {
+    return BadRequest(error->message);
+  }
 
+  error = validation::operation::validate(operation.unreserve());
   if (error.isSome()) {
-    return BadRequest(
-        "Invalid UNRESERVE operation: " + error.get().message);
+    return BadRequest("Invalid UNRESERVE operation: " + error->message);
   }
 
   return master->authorizeUnreserveResources(operation.unreserve(), principal)
@@ -5048,7 +5033,7 @@ Future<Response> Master::Http::_unreserve(
         return Forbidden();
       }
 
-      return _operation(slaveId, resources, operation);
+      return _operation(slaveId, operation.unreserve().resources(), operation);
     }));
 }
 
@@ -5125,7 +5110,8 @@ Future<Response> Master::Http::unreserveResources(
   CHECK_EQ(mesos::master::Call::UNRESERVE_RESOURCES, call.type());
 
   const SlaveID& slaveId = call.unreserve_resources().slave_id();
-  const Resources& resources = call.unreserve_resources().resources();
+  const RepeatedPtrField<Resource>& resources =
+    call.unreserve_resources().resources();
 
   return _unreserve(slaveId, resources, principal);
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/2617482f/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 9dd6a53..95c2d0f 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -1397,13 +1397,13 @@ private:
 
     process::Future<process::http::Response> _reserve(
         const SlaveID& slaveId,
-        const Resources& resources,
+        const google::protobuf::RepeatedPtrField<Resource>& resources,
         const Option<process::http::authentication::Principal>&
             principal) const;
 
     process::Future<process::http::Response> _unreserve(
         const SlaveID& slaveId,
-        const Resources& resources,
+        const google::protobuf::RepeatedPtrField<Resource>& resources,
         const Option<process::http::authentication::Principal>&
             principal) const;
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/2617482f/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index f03f119..c778c6c 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -102,6 +102,7 @@ using process::Owned;
 using process::PID;
 using process::Promise;
 
+using process::http::Accepted;
 using process::http::OK;
 using process::http::Response;
 using process::http::Unauthorized;
@@ -7776,6 +7777,150 @@ TEST_P(MasterTestPrePostReservationRefinement, StateEndpointPendingTasks)
   driver.join();
 }
 
+
+// This test verifies that an operator can reserve and unreserve
+// resources through the master operator API in both
+// "(pre|post)-reservation-refinement" formats.
+TEST_P(MasterTestPrePostReservationRefinement, ReserveAndUnreserveResourcesV1)
+{
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  // For capturing the SlaveID.
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get());
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(slaveRegisteredMessage);
+  SlaveID slaveId = slaveRegisteredMessage->slave_id();
+
+  v1::master::Call v1ReserveResourcesCall;
+  v1ReserveResourcesCall.set_type(v1::master::Call::RESERVE_RESOURCES);
+  v1::master::Call::ReserveResources* reserveResources =
+    v1ReserveResourcesCall.mutable_reserve_resources();
+
+  Resources unreserved = Resources::parse("cpus:1;mem:512").get();
+  Resources dynamicallyReserved =
+    unreserved.pushReservation(createDynamicReservationInfo(
+        DEFAULT_TEST_ROLE, DEFAULT_CREDENTIAL.principal()));
+
+  reserveResources->mutable_agent_id()->CopyFrom(evolve(slaveId));
+  reserveResources->mutable_resources()->CopyFrom(
+      evolve<v1::Resource>(outboundResources(dynamicallyReserved)));
+
+  ContentType contentType = ContentType::PROTOBUF;
+
+  Future<Response> v1ReserveResourcesResponse = process::http::post(
+    master.get()->pid,
+    "api/v1",
+    createBasicAuthHeaders(DEFAULT_CREDENTIAL),
+    serialize(contentType, v1ReserveResourcesCall),
+    stringify(contentType));
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(
+      Accepted().status, v1ReserveResourcesResponse);
+
+  v1::master::Call v1UnreserveResourcesCall;
+  v1UnreserveResourcesCall.set_type(v1::master::Call::UNRESERVE_RESOURCES);
+  v1::master::Call::UnreserveResources* unreserveResources =
+    v1UnreserveResourcesCall.mutable_unreserve_resources();
+
+  unreserveResources->mutable_agent_id()->CopyFrom(evolve(slaveId));
+
+  unreserveResources->mutable_resources()->CopyFrom(
+      evolve<v1::Resource>(outboundResources(dynamicallyReserved)));
+
+  Future<Response> v1UnreserveResourcesResponse = process::http::post(
+      master.get()->pid,
+      "api/v1",
+      createBasicAuthHeaders(DEFAULT_CREDENTIAL),
+      serialize(contentType, v1UnreserveResourcesCall),
+      stringify(contentType));
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(
+      Accepted().status, v1UnreserveResourcesResponse);
+}
+
+
+// This test verifies that an operator can create and destroy
+// persistent volumes through the master operator API in both
+// "(pre|post)-reservation-refinement" formats.
+TEST_P(MasterTestPrePostReservationRefinement, CreateAndDestroyVolumesV1)
+{
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  // For capturing the SlaveID so we can use it in the create/destroy volumes
+  // API call.
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  // Do Static reservation so we can create persistent volumes from it.
+  slaveFlags.resources = "disk(role1):1024";
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(slaveRegisteredMessage);
+  SlaveID slaveId = slaveRegisteredMessage->slave_id();
+
+  // Create the persistent volume.
+  v1::master::Call v1CreateVolumesCall;
+  v1CreateVolumesCall.set_type(v1::master::Call::CREATE_VOLUMES);
+  v1::master::Call_CreateVolumes* createVolumes =
+    v1CreateVolumesCall.mutable_create_volumes();
+
+  Resources volume = createPersistentVolume(
+      Megabytes(64),
+      "role1",
+      "id1",
+      "path1",
+      None(),
+      None(),
+      DEFAULT_CREDENTIAL.principal());
+
+  createVolumes->mutable_agent_id()->CopyFrom(evolve(slaveId));
+  createVolumes->mutable_volumes()->CopyFrom(
+      evolve<v1::Resource>(outboundResources(volume)));
+
+  ContentType contentType = ContentType::PROTOBUF;
+
+  Future<Response> v1CreateVolumesResponse = process::http::post(
+      master.get()->pid,
+      "api/v1",
+      createBasicAuthHeaders(DEFAULT_CREDENTIAL),
+      serialize(contentType, v1CreateVolumesCall),
+      stringify(contentType));
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(Accepted().status, v1CreateVolumesResponse);
+
+  // Destroy the persistent volume.
+  v1::master::Call v1DestroyVolumesCall;
+  v1DestroyVolumesCall.set_type(v1::master::Call::DESTROY_VOLUMES);
+  v1::master::Call_DestroyVolumes* destroyVolumes =
+    v1DestroyVolumesCall.mutable_destroy_volumes();
+
+  destroyVolumes->mutable_agent_id()->CopyFrom(evolve(slaveId));
+  destroyVolumes->mutable_volumes()->CopyFrom(
+      evolve<v1::Resource>(outboundResources(volume)));
+
+  Future<Response> v1DestroyVolumesResponse = process::http::post(
+      master.get()->pid,
+      "api/v1",
+      createBasicAuthHeaders(DEFAULT_CREDENTIAL),
+      serialize(contentType, v1DestroyVolumesCall),
+      stringify(contentType));
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(Accepted().status, v1DestroyVolumesResponse);
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/2617482f/src/tests/persistent_volume_endpoints_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/persistent_volume_endpoints_tests.cpp b/src/tests/persistent_volume_endpoints_tests.cpp
index 518bdf8..a66bde2 100644
--- a/src/tests/persistent_volume_endpoints_tests.cpp
+++ b/src/tests/persistent_volume_endpoints_tests.cpp
@@ -506,9 +506,9 @@ TEST_F(PersistentVolumeEndpointsTest, InvalidVolume)
         body);
 
     AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, response);
-    ASSERT_EQ(
+    ASSERT_TRUE(strings::contains(
         response->body,
-        "Invalid reservation: role \"*\" cannot be reserved");
+        "Invalid reservation: role \"*\" cannot be reserved"));
   }
 
   {
@@ -519,9 +519,9 @@ TEST_F(PersistentVolumeEndpointsTest, InvalidVolume)
         body);
 
     AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, response);
-    ASSERT_EQ(
+    ASSERT_TRUE(strings::contains(
         response->body,
-        "Invalid reservation: role \"*\" cannot be reserved");
+        "Invalid reservation: role \"*\" cannot be reserved"));
   }
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/2617482f/src/tests/reservation_endpoints_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/reservation_endpoints_tests.cpp b/src/tests/reservation_endpoints_tests.cpp
index f710a18..05b505f 100644
--- a/src/tests/reservation_endpoints_tests.cpp
+++ b/src/tests/reservation_endpoints_tests.cpp
@@ -811,8 +811,9 @@ TEST_F(ReservationEndpointsTest, InvalidResource)
       process::http::post(master.get()->pid, "reserve", headers, body);
 
     AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, response);
-    ASSERT_EQ(response->body,
-             "Invalid reservation: role \"*\" cannot be reserved");
+    ASSERT_TRUE(strings::contains(
+        response->body,
+        "Invalid reservation: role \"*\" cannot be reserved"));
   }
 
   {
@@ -820,8 +821,9 @@ TEST_F(ReservationEndpointsTest, InvalidResource)
       process::http::post(master.get()->pid, "unreserve", headers, body);
 
     AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, response);
-    ASSERT_EQ(response->body,
-             "Invalid reservation: role \"*\" cannot be reserved");
+    ASSERT_TRUE(strings::contains(
+        response->body,
+        "Invalid reservation: role \"*\" cannot be reserved"));
   }
 }
 


[3/4] mesos git commit: Updated `accept` to perform operation adjustment in one place.

Posted by mp...@apache.org.
Updated `accept` to perform operation adjustment in one place.

It used to be that the minor adjustments that were made to operations
were done in various places across `accept` and `_accept`.

The "executor-injection" for LAUNCH_GROUP was at the beginning of
`accept`, "allocation-info-injection" for MULTI_ROLE was after offer
validation, and "health-check-injection" for LAUNCH was in `_accept`.

The `Master::accept` function is now broken down into distinct
"metrics accounting", "offer validation", "operation-adjustments", and
"authorization" stages.

Review: https://reviews.apache.org/r/60562


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/710b7217
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/710b7217
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/710b7217

Branch: refs/heads/master
Commit: 710b72179938cac2100c90277ce7ced5c8ca3401
Parents: 21de4b6
Author: Michael Park <mp...@apache.org>
Authored: Thu Jun 29 20:53:59 2017 -0700
Committer: Michael Park <mp...@apache.org>
Committed: Wed Jul 5 03:41:37 2017 -0700

----------------------------------------------------------------------
 src/master/master.cpp | 133 +++++++++++++++++++++++++++------------------
 1 file changed, 80 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/710b7217/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 9d5566b..da0a13b 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -3805,7 +3805,8 @@ void Master::accept(
 {
   CHECK_NOTNULL(framework);
 
-  foreach (Offer::Operation& operation, *accept.mutable_operations()) {
+  // Bump metrics.
+  foreach (const Offer::Operation& operation, accept.operations()) {
     if (operation.type() == Offer::Operation::LAUNCH) {
       if (operation.launch().task_infos().size() > 0) {
         ++metrics->messages_launch_tasks;
@@ -3815,22 +3816,9 @@ void Master::accept(
                      << " in ACCEPT call for framework " << framework->id()
                      << " as the launch operation specified no tasks";
       }
-    } else if (operation.type() == Offer::Operation::LAUNCH_GROUP) {
-      const ExecutorInfo& executor = operation.launch_group().executor();
-
-      TaskGroupInfo* taskGroup =
-        operation.mutable_launch_group()->mutable_task_group();
-
-      // Mutate `TaskInfo` to include `ExecutorInfo` to make it easy
-      // for operator API and WebUI to get access to the corresponding
-      // executor for tasks in the task group.
-      foreach (TaskInfo& task, *taskGroup->mutable_tasks()) {
-        if (!task.has_executor()) {
-          task.mutable_executor()->CopyFrom(executor);
-        }
-      }
     }
 
+    // TODO(mpark): Add metrics for LAUNCH_GROUP operation.
     // TODO(jieyu): Add metrics for non launch operations.
   }
 
@@ -3937,14 +3925,85 @@ void Master::accept(
     return;
   }
 
-  CHECK_SOME(allocationInfo);
-
-  // With the addition of the MULTI_ROLE capability, the resources
-  // within an offer now contain an `AllocationInfo`. We therefore
-  // inject the offer's allocation info into the operation's
-  // resources if the scheduler has not done so already.
+  // We make various adjustments to the `Offer::Operation`s,
+  // typically for backward/forward compatibility.
+  // TODO(mpark): Pull this out to a master normalization utility.
   foreach (Offer::Operation& operation, *accept.mutable_operations()) {
+    // With the addition of the MULTI_ROLE capability, the resources
+    // within an offer now contain an `AllocationInfo`. We therefore
+    // inject the offer's allocation info into the operation's
+    // resources if the scheduler has not done so already.
+    CHECK_SOME(allocationInfo);
     protobuf::injectAllocationInfo(&operation, allocationInfo.get());
+
+    switch (operation.type()) {
+      case Offer::Operation::RESERVE:
+      case Offer::Operation::UNRESERVE:
+      case Offer::Operation::CREATE:
+      case Offer::Operation::DESTROY: {
+        // No-op.
+        break;
+      }
+      case Offer::Operation::LAUNCH: {
+        foreach (
+            TaskInfo& task, *operation.mutable_launch()->mutable_task_infos()) {
+          // TODO(haosdent): Once we have internal `TaskInfo` separate from
+          // the v0 `TaskInfo` (see MESOS-6268), consider extracting the
+          // following adaptation code into devolve methods from v0 and v1
+          // `TaskInfo` to internal `TaskInfo`.
+          //
+          // Make a copy of the original task so that we can fill the missing
+          // `framework_id` in `ExecutorInfo` if needed. This field was added
+          // to the API later and thus was made optional.
+          if (task.has_executor() && !task.executor().has_framework_id()) {
+            task.mutable_executor()->mutable_framework_id()->CopyFrom(
+                framework->id());
+          }
+
+          // For backwards compatibility with the v0 and v1 API, when
+          // the type of the health check is not specified, determine
+          // its type from the `http` and `command` fields.
+          //
+          // TODO(haosdent): Remove this after the deprecation cycle which
+          // starts in 2.0.
+          if (task.has_health_check() && !task.health_check().has_type()) {
+            LOG(WARNING) << "The type of health check is not set; use of "
+                         << "'HealthCheck' without specifying 'type' will be "
+                         << "deprecated in Mesos 2.0";
+
+            const HealthCheck& healthCheck = task.health_check();
+            if (healthCheck.has_command() && !healthCheck.has_http()) {
+              task.mutable_health_check()->set_type(HealthCheck::COMMAND);
+            } else if (healthCheck.has_http() && !healthCheck.has_command()) {
+              task.mutable_health_check()->set_type(HealthCheck::HTTP);
+            }
+          }
+        }
+
+        break;
+      }
+      case Offer::Operation::LAUNCH_GROUP: {
+        const ExecutorInfo& executor = operation.launch_group().executor();
+
+        TaskGroupInfo* taskGroup =
+          operation.mutable_launch_group()->mutable_task_group();
+
+        // Mutate `TaskInfo` to include `ExecutorInfo` to make it easy
+        // for operator API and WebUI to get access to the corresponding
+        // executor for tasks in the task group.
+        foreach (TaskInfo& task, *taskGroup->mutable_tasks()) {
+          if (!task.has_executor()) {
+            task.mutable_executor()->CopyFrom(executor);
+          }
+        }
+
+        break;
+      }
+      case Offer::Operation::UNKNOWN: {
+        // No-op.
+        break;
+      }
+    }
   }
 
   CHECK_SOME(slaveId);
@@ -4556,38 +4615,6 @@ void Master::_accept(
 
           // Validate the task.
 
-          // TODO(haosdent): Once we have internal `TaskInfo` separate from
-          // the v0 `TaskInfo` (see MESOS-6268), consider extracting the
-          // following adaptation code into devolve methods from v0 and v1
-          // `TaskInfo` to internal `TaskInfo`.
-          //
-          // Make a copy of the original task so that we can fill the missing
-          // `framework_id` in `ExecutorInfo` if needed. This field was added
-          // to the API later and thus was made optional.
-          if (task.has_executor() && !task.executor().has_framework_id()) {
-            task.mutable_executor()->mutable_framework_id()->CopyFrom(
-                framework->id());
-          }
-
-          // For backwards compatibility with the v0 and v1 API, when
-          // the type of the health check is not specified, determine
-          // its type from the `http` and `command` fields.
-          //
-          // TODO(haosdent): Remove this after the deprecation cycle which
-          // starts in 2.0.
-          if (task.has_health_check() && !task.health_check().has_type()) {
-            LOG(WARNING) << "The type of health check is not set; use of "
-                         << "'HealthCheck' without specifying 'type' will be "
-                         << "deprecated in Mesos 2.0";
-
-            const HealthCheck& healthCheck = task.health_check();
-            if (healthCheck.has_command() && !healthCheck.has_http()) {
-              task.mutable_health_check()->set_type(HealthCheck::COMMAND);
-            } else if (healthCheck.has_http() && !healthCheck.has_command()) {
-              task.mutable_health_check()->set_type(HealthCheck::HTTP);
-            }
-          }
-
           // We add back offered shared resources for validation even if they
           // are already consumed by other tasks in the same ACCEPT call. This
           // allows these tasks to use more copies of the same shared resource