You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by mp...@apache.org on 2017/07/05 10:54:12 UTC

[4/4] mesos git commit: Validated and normalized resources on the V1 operator API path.

Validated and normalized resources on the V1 operator API path.

Before, the validation / normalization of resources were incorrectly
only performed on the V0 operator API. This patch performs them also
on the V1 operator API by changing the common codepath taken by V0
and V1. We also update the V0 codepath to defer the construction of
`Resources` until we get to the common codepath.

Review: https://reviews.apache.org/r/60566


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2617482f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2617482f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2617482f

Branch: refs/heads/master
Commit: 2617482f0b2b446dfa4b42c85ce5356fefe4f0b7
Parents: e097f21
Author: Michael Park <mp...@apache.org>
Authored: Fri Jun 30 02:37:10 2017 -0700
Committer: Michael Park <mp...@apache.org>
Committed: Wed Jul 5 03:42:58 2017 -0700

----------------------------------------------------------------------
 src/master/http.cpp                             | 112 +++++++-------
 src/master/master.hpp                           |   4 +-
 src/tests/master_tests.cpp                      | 145 +++++++++++++++++++
 src/tests/persistent_volume_endpoints_tests.cpp |   8 +-
 src/tests/reservation_endpoints_tests.cpp       |  10 +-
 5 files changed, 206 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/2617482f/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 64b7cdd..175a44c 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -1080,7 +1080,7 @@ Future<Response> Master::Http::createVolumes(
         parse.error());
   }
 
-  Resources volumes;
+  RepeatedPtrField<Resource> volumes;
   foreach (const JSON::Value& value, parse.get().values) {
     Try<Resource> volume = ::protobuf::parse<Resource>(value);
     if (volume.isError()) {
@@ -1089,16 +1089,7 @@ Future<Response> Master::Http::createVolumes(
           volume.error());
     }
 
-    // Since the `+=` operator will silently drop invalid resources, we validate
-    // each resource individually.
-    Option<Error> error = Resources::validate(volume.get());
-    if (error.isSome()) {
-      return BadRequest(error.get().message);
-    }
-
-    convertResourceFormat(&volume.get(), POST_RESERVATION_REFINEMENT);
-
-    volumes += volume.get();
+    volumes.Add()->CopyFrom(volume.get());
   }
 
   return _createVolumes(slaveId, volumes, principal);
@@ -1120,16 +1111,21 @@ Future<Response> Master::Http::_createVolumes(
   operation.set_type(Offer::Operation::CREATE);
   operation.mutable_create()->mutable_volumes()->CopyFrom(volumes);
 
-  Option<Error> validate = validation::operation::validate(
+  Option<Error> error = validateAndNormalizeResources(&operation);
+  if (error.isSome()) {
+    return BadRequest(error->message);
+  }
+
+  error = validation::operation::validate(
       operation.create(),
       slave->checkpointedResources,
       principal,
       slave->capabilities);
 
-  if (validate.isSome()) {
+  if (error.isSome()) {
     return BadRequest(
         "Invalid CREATE operation on agent " + stringify(*slave) + ": " +
-        validate.get().message);
+        error->message);
   }
 
   return master->authorizeCreateVolume(operation.create(), principal)
@@ -1141,7 +1137,8 @@ Future<Response> Master::Http::_createVolumes(
       // The resources required for this operation are equivalent to the
       // volumes specified by the user minus any DiskInfo (DiskInfo will
       // be created when this operation is applied).
-      return _operation(slaveId, removeDiskInfos(volumes), operation);
+      return _operation(
+          slaveId, removeDiskInfos(operation.create().volumes()), operation);
     }));
 }
 
@@ -1257,7 +1254,7 @@ Future<Response> Master::Http::destroyVolumes(
         parse.error());
   }
 
-  Resources volumes;
+  RepeatedPtrField<Resource> volumes;
   foreach (const JSON::Value& value, parse.get().values) {
     Try<Resource> volume = ::protobuf::parse<Resource>(value);
     if (volume.isError()) {
@@ -1266,16 +1263,7 @@ Future<Response> Master::Http::destroyVolumes(
           volume.error());
     }
 
-    // Since the `+=` operator will silently drop invalid resources, we validate
-    // each resource individually.
-    Option<Error> error = Resources::validate(volume.get());
-    if (error.isSome()) {
-      return BadRequest(error.get().message);
-    }
-
-    convertResourceFormat(&volume.get(), POST_RESERVATION_REFINEMENT);
-
-    volumes += volume.get();
+    volumes.Add()->CopyFrom(volume.get());
   }
 
   return _destroyVolumes(slaveId, volumes, principal);
@@ -1297,14 +1285,19 @@ Future<Response> Master::Http::_destroyVolumes(
   operation.set_type(Offer::Operation::DESTROY);
   operation.mutable_destroy()->mutable_volumes()->CopyFrom(volumes);
 
-  Option<Error> validate = validation::operation::validate(
+  Option<Error> error = validateAndNormalizeResources(&operation);
+  if (error.isSome()) {
+    return BadRequest(error->message);
+  }
+
+  error = validation::operation::validate(
       operation.destroy(),
       slave->checkpointedResources,
       slave->usedResources,
       slave->pendingTasks);
 
-  if (validate.isSome()) {
-    return BadRequest("Invalid DESTROY operation: " + validate.get().message);
+  if (error.isSome()) {
+    return BadRequest("Invalid DESTROY operation: " + error->message);
   }
 
   return master->authorizeDestroyVolume(operation.destroy(), principal)
@@ -1313,7 +1306,7 @@ Future<Response> Master::Http::_destroyVolumes(
         return Forbidden();
       }
 
-      return _operation(slaveId, volumes, operation);
+      return _operation(slaveId, operation.destroy().volumes(), operation);
     }));
 }
 
@@ -2227,7 +2220,7 @@ Future<Response> Master::Http::reserve(
         parse.error());
   }
 
-  Resources resources;
+  RepeatedPtrField<Resource> resources;
   foreach (const JSON::Value& value, parse.get().values) {
     Try<Resource> resource = ::protobuf::parse<Resource>(value);
     if (resource.isError()) {
@@ -2236,16 +2229,7 @@ Future<Response> Master::Http::reserve(
           resource.error());
     }
 
-    // Since the `+=` operator will silently drop invalid resources, we validate
-    // each resource individually.
-    Option<Error> error = Resources::validate(resource.get());
-    if (error.isSome()) {
-      return BadRequest(error.get().message);
-    }
-
-    convertResourceFormat(&resource.get(), POST_RESERVATION_REFINEMENT);
-
-    resources += resource.get();
+    resources.Add()->CopyFrom(resource.get());
   }
 
   return _reserve(slaveId, resources, principal);
@@ -2254,7 +2238,7 @@ Future<Response> Master::Http::reserve(
 
 Future<Response> Master::Http::_reserve(
     const SlaveID& slaveId,
-    const Resources& resources,
+    const RepeatedPtrField<Resource>& resources,
     const Option<Principal>& principal) const
 {
   Slave* slave = master->slaves.registered.get(slaveId);
@@ -2267,13 +2251,18 @@ Future<Response> Master::Http::_reserve(
   operation.set_type(Offer::Operation::RESERVE);
   operation.mutable_reserve()->mutable_resources()->CopyFrom(resources);
 
-  Option<Error> error = validation::operation::validate(
+  Option<Error> error = validateAndNormalizeResources(&operation);
+  if (error.isSome()) {
+    return BadRequest(error->message);
+  }
+
+  error = validation::operation::validate(
       operation.reserve(), principal, slave->capabilities);
 
   if (error.isSome()) {
     return BadRequest(
         "Invalid RESERVE operation on agent " + stringify(*slave) + ": " +
-        error.get().message);
+        error->message);
   }
 
   return master->authorizeReserveResources(operation.reserve(), principal)
@@ -2284,7 +2273,8 @@ Future<Response> Master::Http::_reserve(
 
       // We only allow "pushing" a single reservation at a time, so we require
       // the resources with one reservation "popped" to be present on the agent.
-      Resources required = resources.popReservation();
+      Resources required =
+        Resources(operation.reserve().resources()).popReservation();
 
       return _operation(slaveId, required, operation);
     }));
@@ -2299,7 +2289,8 @@ Future<Response> Master::Http::reserveResources(
   CHECK_EQ(mesos::master::Call::RESERVE_RESOURCES, call.type());
 
   const SlaveID& slaveId = call.reserve_resources().slave_id();
-  const Resources& resources = call.reserve_resources().resources();
+  const RepeatedPtrField<Resource>& resources =
+    call.reserve_resources().resources();
 
   return _reserve(slaveId, resources, principal);
 }
@@ -4995,7 +4986,7 @@ Future<Response> Master::Http::unreserve(
         parse.error());
   }
 
-  Resources resources;
+  RepeatedPtrField<Resource> resources;
   foreach (const JSON::Value& value, parse.get().values) {
     Try<Resource> resource = ::protobuf::parse<Resource>(value);
     if (resource.isError()) {
@@ -5004,16 +4995,7 @@ Future<Response> Master::Http::unreserve(
           resource.error());
     }
 
-    // Since the `+=` operator will silently drop invalid resources, we validate
-    // each resource individually.
-    Option<Error> error = Resources::validate(resource.get());
-    if (error.isSome()) {
-      return BadRequest(error.get().message);
-    }
-
-    convertResourceFormat(&resource.get(), POST_RESERVATION_REFINEMENT);
-
-    resources += resource.get();
+    resources.Add()->CopyFrom(resource.get());
   }
 
   return _unreserve(slaveId, resources, principal);
@@ -5022,7 +5004,7 @@ Future<Response> Master::Http::unreserve(
 
 Future<Response> Master::Http::_unreserve(
     const SlaveID& slaveId,
-    const Resources& resources,
+    const RepeatedPtrField<Resource>& resources,
     const Option<Principal>& principal) const
 {
   Slave* slave = master->slaves.registered.get(slaveId);
@@ -5035,11 +5017,14 @@ Future<Response> Master::Http::_unreserve(
   operation.set_type(Offer::Operation::UNRESERVE);
   operation.mutable_unreserve()->mutable_resources()->CopyFrom(resources);
 
-  Option<Error> error = validation::operation::validate(operation.unreserve());
+  Option<Error> error = validateAndNormalizeResources(&operation);
+  if (error.isSome()) {
+    return BadRequest(error->message);
+  }
 
+  error = validation::operation::validate(operation.unreserve());
   if (error.isSome()) {
-    return BadRequest(
-        "Invalid UNRESERVE operation: " + error.get().message);
+    return BadRequest("Invalid UNRESERVE operation: " + error->message);
   }
 
   return master->authorizeUnreserveResources(operation.unreserve(), principal)
@@ -5048,7 +5033,7 @@ Future<Response> Master::Http::_unreserve(
         return Forbidden();
       }
 
-      return _operation(slaveId, resources, operation);
+      return _operation(slaveId, operation.unreserve().resources(), operation);
     }));
 }
 
@@ -5125,7 +5110,8 @@ Future<Response> Master::Http::unreserveResources(
   CHECK_EQ(mesos::master::Call::UNRESERVE_RESOURCES, call.type());
 
   const SlaveID& slaveId = call.unreserve_resources().slave_id();
-  const Resources& resources = call.unreserve_resources().resources();
+  const RepeatedPtrField<Resource>& resources =
+    call.unreserve_resources().resources();
 
   return _unreserve(slaveId, resources, principal);
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/2617482f/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 9dd6a53..95c2d0f 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -1397,13 +1397,13 @@ private:
 
     process::Future<process::http::Response> _reserve(
         const SlaveID& slaveId,
-        const Resources& resources,
+        const google::protobuf::RepeatedPtrField<Resource>& resources,
         const Option<process::http::authentication::Principal>&
             principal) const;
 
     process::Future<process::http::Response> _unreserve(
         const SlaveID& slaveId,
-        const Resources& resources,
+        const google::protobuf::RepeatedPtrField<Resource>& resources,
         const Option<process::http::authentication::Principal>&
             principal) const;
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/2617482f/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index f03f119..c778c6c 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -102,6 +102,7 @@ using process::Owned;
 using process::PID;
 using process::Promise;
 
+using process::http::Accepted;
 using process::http::OK;
 using process::http::Response;
 using process::http::Unauthorized;
@@ -7776,6 +7777,150 @@ TEST_P(MasterTestPrePostReservationRefinement, StateEndpointPendingTasks)
   driver.join();
 }
 
+
+// This test verifies that an operator can reserve and unreserve
+// resources through the master operator API in both
+// "(pre|post)-reservation-refinement" formats.
+TEST_P(MasterTestPrePostReservationRefinement, ReserveAndUnreserveResourcesV1)
+{
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  // For capturing the SlaveID.
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get());
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(slaveRegisteredMessage);
+  SlaveID slaveId = slaveRegisteredMessage->slave_id();
+
+  v1::master::Call v1ReserveResourcesCall;
+  v1ReserveResourcesCall.set_type(v1::master::Call::RESERVE_RESOURCES);
+  v1::master::Call::ReserveResources* reserveResources =
+    v1ReserveResourcesCall.mutable_reserve_resources();
+
+  Resources unreserved = Resources::parse("cpus:1;mem:512").get();
+  Resources dynamicallyReserved =
+    unreserved.pushReservation(createDynamicReservationInfo(
+        DEFAULT_TEST_ROLE, DEFAULT_CREDENTIAL.principal()));
+
+  reserveResources->mutable_agent_id()->CopyFrom(evolve(slaveId));
+  reserveResources->mutable_resources()->CopyFrom(
+      evolve<v1::Resource>(outboundResources(dynamicallyReserved)));
+
+  ContentType contentType = ContentType::PROTOBUF;
+
+  Future<Response> v1ReserveResourcesResponse = process::http::post(
+    master.get()->pid,
+    "api/v1",
+    createBasicAuthHeaders(DEFAULT_CREDENTIAL),
+    serialize(contentType, v1ReserveResourcesCall),
+    stringify(contentType));
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(
+      Accepted().status, v1ReserveResourcesResponse);
+
+  v1::master::Call v1UnreserveResourcesCall;
+  v1UnreserveResourcesCall.set_type(v1::master::Call::UNRESERVE_RESOURCES);
+  v1::master::Call::UnreserveResources* unreserveResources =
+    v1UnreserveResourcesCall.mutable_unreserve_resources();
+
+  unreserveResources->mutable_agent_id()->CopyFrom(evolve(slaveId));
+
+  unreserveResources->mutable_resources()->CopyFrom(
+      evolve<v1::Resource>(outboundResources(dynamicallyReserved)));
+
+  Future<Response> v1UnreserveResourcesResponse = process::http::post(
+      master.get()->pid,
+      "api/v1",
+      createBasicAuthHeaders(DEFAULT_CREDENTIAL),
+      serialize(contentType, v1UnreserveResourcesCall),
+      stringify(contentType));
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(
+      Accepted().status, v1UnreserveResourcesResponse);
+}
+
+
+// This test verifies that an operator can create and destroy
+// persistent volumes through the master operator API in both
+// "(pre|post)-reservation-refinement" formats.
+TEST_P(MasterTestPrePostReservationRefinement, CreateAndDestroyVolumesV1)
+{
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  // For capturing the SlaveID so we can use it in the create/destroy volumes
+  // API call.
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  // Do Static reservation so we can create persistent volumes from it.
+  slaveFlags.resources = "disk(role1):1024";
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(slaveRegisteredMessage);
+  SlaveID slaveId = slaveRegisteredMessage->slave_id();
+
+  // Create the persistent volume.
+  v1::master::Call v1CreateVolumesCall;
+  v1CreateVolumesCall.set_type(v1::master::Call::CREATE_VOLUMES);
+  v1::master::Call_CreateVolumes* createVolumes =
+    v1CreateVolumesCall.mutable_create_volumes();
+
+  Resources volume = createPersistentVolume(
+      Megabytes(64),
+      "role1",
+      "id1",
+      "path1",
+      None(),
+      None(),
+      DEFAULT_CREDENTIAL.principal());
+
+  createVolumes->mutable_agent_id()->CopyFrom(evolve(slaveId));
+  createVolumes->mutable_volumes()->CopyFrom(
+      evolve<v1::Resource>(outboundResources(volume)));
+
+  ContentType contentType = ContentType::PROTOBUF;
+
+  Future<Response> v1CreateVolumesResponse = process::http::post(
+      master.get()->pid,
+      "api/v1",
+      createBasicAuthHeaders(DEFAULT_CREDENTIAL),
+      serialize(contentType, v1CreateVolumesCall),
+      stringify(contentType));
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(Accepted().status, v1CreateVolumesResponse);
+
+  // Destroy the persistent volume.
+  v1::master::Call v1DestroyVolumesCall;
+  v1DestroyVolumesCall.set_type(v1::master::Call::DESTROY_VOLUMES);
+  v1::master::Call_DestroyVolumes* destroyVolumes =
+    v1DestroyVolumesCall.mutable_destroy_volumes();
+
+  destroyVolumes->mutable_agent_id()->CopyFrom(evolve(slaveId));
+  destroyVolumes->mutable_volumes()->CopyFrom(
+      evolve<v1::Resource>(outboundResources(volume)));
+
+  Future<Response> v1DestroyVolumesResponse = process::http::post(
+      master.get()->pid,
+      "api/v1",
+      createBasicAuthHeaders(DEFAULT_CREDENTIAL),
+      serialize(contentType, v1DestroyVolumesCall),
+      stringify(contentType));
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(Accepted().status, v1DestroyVolumesResponse);
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/2617482f/src/tests/persistent_volume_endpoints_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/persistent_volume_endpoints_tests.cpp b/src/tests/persistent_volume_endpoints_tests.cpp
index 518bdf8..a66bde2 100644
--- a/src/tests/persistent_volume_endpoints_tests.cpp
+++ b/src/tests/persistent_volume_endpoints_tests.cpp
@@ -506,9 +506,9 @@ TEST_F(PersistentVolumeEndpointsTest, InvalidVolume)
         body);
 
     AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, response);
-    ASSERT_EQ(
+    ASSERT_TRUE(strings::contains(
         response->body,
-        "Invalid reservation: role \"*\" cannot be reserved");
+        "Invalid reservation: role \"*\" cannot be reserved"));
   }
 
   {
@@ -519,9 +519,9 @@ TEST_F(PersistentVolumeEndpointsTest, InvalidVolume)
         body);
 
     AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, response);
-    ASSERT_EQ(
+    ASSERT_TRUE(strings::contains(
         response->body,
-        "Invalid reservation: role \"*\" cannot be reserved");
+        "Invalid reservation: role \"*\" cannot be reserved"));
   }
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/2617482f/src/tests/reservation_endpoints_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/reservation_endpoints_tests.cpp b/src/tests/reservation_endpoints_tests.cpp
index f710a18..05b505f 100644
--- a/src/tests/reservation_endpoints_tests.cpp
+++ b/src/tests/reservation_endpoints_tests.cpp
@@ -811,8 +811,9 @@ TEST_F(ReservationEndpointsTest, InvalidResource)
       process::http::post(master.get()->pid, "reserve", headers, body);
 
     AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, response);
-    ASSERT_EQ(response->body,
-             "Invalid reservation: role \"*\" cannot be reserved");
+    ASSERT_TRUE(strings::contains(
+        response->body,
+        "Invalid reservation: role \"*\" cannot be reserved"));
   }
 
   {
@@ -820,8 +821,9 @@ TEST_F(ReservationEndpointsTest, InvalidResource)
       process::http::post(master.get()->pid, "unreserve", headers, body);
 
     AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, response);
-    ASSERT_EQ(response->body,
-             "Invalid reservation: role \"*\" cannot be reserved");
+    ASSERT_TRUE(strings::contains(
+        response->body,
+        "Invalid reservation: role \"*\" cannot be reserved"));
   }
 }