You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by gr...@apache.org on 2018/02/08 02:34:43 UTC

[1/4] mesos git commit: Tested reconciliation when operation is dropped en route to agent.

Repository: mesos
Updated Branches:
  refs/heads/master fbe806869 -> 93f48fd52


Tested reconciliation when operation is dropped en route to agent.

This patch adds
'StorageLocalResourceProviderTest.ROOT_ReconcileDroppedOperation'
in order to verify that reconciliation is performed correctly
when an operation is dropped on its way from the master to the
agent.

Review: https://reviews.apache.org/r/65039


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/93f48fd5
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/93f48fd5
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/93f48fd5

Branch: refs/heads/master
Commit: 93f48fd52074154074bfd4b4aa6ff7855034e609
Parents: e815417
Author: Greg Mann <gr...@gmail.com>
Authored: Wed Feb 7 14:53:41 2018 -0800
Committer: Greg Mann <gr...@gmail.com>
Committed: Wed Feb 7 18:31:29 2018 -0800

----------------------------------------------------------------------
 .../storage_local_resource_provider_tests.cpp   | 219 +++++++++++++++++++
 1 file changed, 219 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/93f48fd5/src/tests/storage_local_resource_provider_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp
index fcac4c4..f390308 100644
--- a/src/tests/storage_local_resource_provider_tests.cpp
+++ b/src/tests/storage_local_resource_provider_tests.cpp
@@ -27,6 +27,8 @@
 
 #include "linux/fs.hpp"
 
+#include "master/detector/standalone.hpp"
+
 #include "module/manager.hpp"
 
 #include "slave/container_daemon_process.hpp"
@@ -47,11 +49,13 @@ using std::vector;
 using mesos::internal::slave::ContainerDaemonProcess;
 
 using mesos::master::detector::MasterDetector;
+using mesos::master::detector::StandaloneMasterDetector;
 
 using process::Clock;
 using process::Future;
 using process::Owned;
 
+using testing::AtMost;
 using testing::Sequence;
 
 namespace mesos {
@@ -3048,6 +3052,221 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_Metrics)
       prefix + "csi_node_plugin_terminations"));
 }
 
+
+// Master reconciles operations that are missing from a re-registering slave.
+// In this case, the `ApplyOperationMessage` is dropped, so the resource
+// provider should send OPERATION_DROPPED. Operations on agent default
+// resources are also tested here; for such operations, the agent generates the
+// dropped status.
+TEST_F(StorageLocalResourceProviderTest, ROOT_ReconcileDroppedOperation)
+{
+  Clock::pause();
+
+  setupResourceProviderConfig(Bytes(0), "volume1:2GB;volume2:2GB");
+
+  master::Flags masterFlags = CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  StandaloneMasterDetector detector(master.get()->pid);
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  slaveFlags.isolation = "filesystem/linux";
+
+  // Disable HTTP authentication to simplify resource provider interactions.
+  slaveFlags.authenticate_http_readwrite = false;
+
+  // Set the resource provider capability.
+  vector<SlaveInfo::Capability> capabilities = slave::AGENT_CAPABILITIES();
+  SlaveInfo::Capability capability;
+  capability.set_type(SlaveInfo::Capability::RESOURCE_PROVIDER);
+  capabilities.push_back(capability);
+
+  slaveFlags.agent_features = SlaveCapabilities();
+  slaveFlags.agent_features->mutable_capabilities()->CopyFrom(
+      {capabilities.begin(), capabilities.end()});
+
+  slaveFlags.resource_provider_config_dir = resourceProviderConfigDir;
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  // Since the local resource provider daemon is started after the agent is
+  // registered, it is guaranteed that the agent will send two
+  // `UpdateSlaveMessage`s, where the latter one contains resources from the
+  // storage local resource provider.
+  //
+  // NOTE: The order of the two `FUTURE_PROTOBUF`s is reversed because
+  // Google Mock will search the expectations in reverse order.
+  Future<UpdateSlaveMessage> updateSlave2 =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+  Future<UpdateSlaveMessage> updateSlave1 =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(&detector, slaveFlags);
+  ASSERT_SOME(slave);
+
+  // Advance the clock to trigger agent registration.
+  Clock::advance(slaveFlags.registration_backoff_factor);
+
+  AWAIT_READY(slaveRegisteredMessage);
+
+  // NOTE: We need to resume the clock so that the resource provider can
+  // periodically check if the CSI endpoint socket has been created by the
+  // plugin container, which runs in another Linux process. Since we do not have
+  // a `Future` linked to the standalone container launch to await on, it is
+  // difficult to accomplish this without resuming the clock.
+  Clock::resume();
+
+  AWAIT_READY(updateSlave2);
+  ASSERT_TRUE(updateSlave2->has_resource_providers());
+  ASSERT_EQ(1, updateSlave2->resource_providers().providers_size());
+
+  Clock::pause();
+
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_roles(0, "storage");
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  // We are only interested in pre-existing volumes, which have IDs but no
+  // profile. We use pre-existing volumes to make it easy to send multiple
+  // operations on multiple resources.
+  auto isPreExistingVolume = [](const Resource& r) {
+    return r.has_disk() &&
+      r.disk().has_source() &&
+      r.disk().source().has_id() &&
+      !r.disk().source().has_profile();
+  };
+
+  // We use the filter explicitly here so that the resources will not
+  // be filtered for 5 seconds (the default).
+  Filters filters;
+  filters.set_refuse_seconds(0);
+
+  // Decline offers that contain only the agent's default resources.
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillRepeatedly(DeclineOffers(filters));
+
+  Future<vector<Offer>> offersBeforeOperations;
+
+  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+      isPreExistingVolume)))
+    .WillOnce(FutureArg<1>(&offersBeforeOperations))
+    .WillRepeatedly(DeclineOffers(filters)); // Decline further matching offers.
+
+  driver.start();
+
+  AWAIT_READY(offersBeforeOperations);
+  ASSERT_FALSE(offersBeforeOperations->empty());
+
+  vector<Resource> sources;
+
+  foreach (
+      const Resource& resource,
+      offersBeforeOperations->at(0).resources()) {
+    if (isPreExistingVolume(resource) &&
+        resource.disk().source().type() == Resource::DiskInfo::Source::RAW) {
+      sources.push_back(resource);
+    }
+  }
+
+  ASSERT_EQ(2u, sources.size());
+
+  // Drop one of the operations on the way to the agent.
+  Future<ApplyOperationMessage> applyOperationMessage =
+    DROP_PROTOBUF(ApplyOperationMessage(), _, _);
+
+  // The successful operation will result in a terminal update.
+  Future<UpdateOperationStatusMessage> operationFinishedStatus =
+    FUTURE_PROTOBUF(UpdateOperationStatusMessage(), _, _);
+
+  // Attempt the creation of two volumes.
+  driver.acceptOffers(
+      {offersBeforeOperations->at(0).id()},
+      {CREATE_VOLUME(sources.at(0), Resource::DiskInfo::Source::MOUNT),
+       CREATE_VOLUME(sources.at(1), Resource::DiskInfo::Source::MOUNT)},
+      filters);
+
+  // Ensure that the operations are processed.
+  Clock::settle();
+
+  AWAIT_READY(applyOperationMessage);
+  AWAIT_READY(operationFinishedStatus);
+
+  Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+    FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+  // Observe explicit operation reconciliation between master and agent.
+  Future<ReconcileOperationsMessage> reconcileOperationsMessage =
+    FUTURE_PROTOBUF(ReconcileOperationsMessage(), _, _);
+  Future<UpdateOperationStatusMessage> operationDroppedStatus =
+    FUTURE_PROTOBUF(UpdateOperationStatusMessage(), _, _);
+
+  // The master may send an offer with the agent's resources after the agent
+  // reregisters, but before an `UpdateSlaveMessage` is sent containing the
+  // resource provider's resources. In this case, the offer will be rescinded.
+  EXPECT_CALL(sched, offerRescinded(&driver, _))
+    .Times(AtMost(1));
+
+  // Simulate a spurious master change event (e.g., due to ZooKeeper
+  // expiration) at the slave to force re-registration.
+  detector.appoint(master.get()->pid);
+
+  // Advance the clock to trigger agent registration.
+  Clock::advance(slaveFlags.registration_backoff_factor);
+
+  AWAIT_READY(slaveReregisteredMessage);
+  AWAIT_READY(reconcileOperationsMessage);
+  AWAIT_READY(operationDroppedStatus);
+
+  std::set<OperationState> expectedStates =
+    {OperationState::OPERATION_DROPPED,
+     OperationState::OPERATION_FINISHED};
+
+  std::set<OperationState> observedStates =
+    {operationFinishedStatus->status().state(),
+     operationDroppedStatus->status().state()};
+
+  ASSERT_EQ(expectedStates, observedStates);
+
+  Future<vector<Offer>> offersAfterOperations;
+
+  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+      isPreExistingVolume)))
+    .WillOnce(FutureArg<1>(&offersAfterOperations));
+
+  // Advance the clock to trigger a batch allocation.
+  Clock::advance(masterFlags.allocation_interval);
+
+  AWAIT_READY(offersAfterOperations);
+  ASSERT_FALSE(offersAfterOperations->empty());
+
+  vector<Resource> converted;
+
+  foreach (const Resource& resource, offersAfterOperations->at(0).resources()) {
+    if (isPreExistingVolume(resource) &&
+        resource.disk().source().type() == Resource::DiskInfo::Source::MOUNT) {
+      converted.push_back(resource);
+    }
+  }
+
+  ASSERT_EQ(1u, converted.size());
+
+  // TODO(greggomann): Add inspection of dropped operation metrics here once
+  // such metrics have been added. See MESOS-8406.
+
+  // Settle the clock to ensure that unexpected messages will cause errors.
+  Clock::settle();
+
+  driver.stop();
+  driver.join();
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {


[4/4] mesos git commit: Added a method to increment invalid scheduler API call counters.

Posted by gr...@apache.org.
Added a method to increment invalid scheduler API call counters.

Review: https://reviews.apache.org/r/65362/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/36aa75bd
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/36aa75bd
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/36aa75bd

Branch: refs/heads/master
Commit: 36aa75bd0abc7e7d18fd9ff55f26074e000716e0
Parents: fbe8068
Author: Gaston Kleiman <ga...@mesosphere.io>
Authored: Wed Feb 7 14:18:22 2018 -0800
Committer: Greg Mann <gr...@gmail.com>
Committed: Wed Feb 7 18:31:29 2018 -0800

----------------------------------------------------------------------
 src/master/http.cpp    |  1 +
 src/master/master.cpp  |  1 +
 src/master/metrics.cpp | 14 ++++++++++++++
 src/master/metrics.hpp |  2 ++
 4 files changed, 18 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/36aa75bd/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index c489b6f..46f2872 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -980,6 +980,7 @@ Future<Response> Master::Http::scheduler(
   Option<Error> error = validation::scheduler::call::validate(call, principal);
 
   if (error.isSome()) {
+    master->metrics->incrementInvalidSchedulerCalls(call);
     return BadRequest("Failed to validate scheduler::Call: " +
                       error.get().message);
   }

http://git-wip-us.apache.org/repos/asf/mesos/blob/36aa75bd/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index cc2685a..d7d2286 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -2336,6 +2336,7 @@ void Master::receive(
   Option<Error> error = validation::scheduler::call::validate(call);
 
   if (error.isSome()) {
+    metrics->incrementInvalidSchedulerCalls(call);
     drop(from, call, error->message);
     return;
   }

http://git-wip-us.apache.org/repos/asf/mesos/blob/36aa75bd/src/master/metrics.cpp
----------------------------------------------------------------------
diff --git a/src/master/metrics.cpp b/src/master/metrics.cpp
index 64fc829..b627372 100644
--- a/src/master/metrics.cpp
+++ b/src/master/metrics.cpp
@@ -481,6 +481,20 @@ Metrics::~Metrics()
 }
 
 
+void Metrics::incrementInvalidSchedulerCalls(const scheduler::Call& call) {
+  if (call.type() == scheduler::Call::ACKNOWLEDGE) {
+    invalid_status_update_acknowledgements++;
+  }
+
+  if (call.type() == scheduler::Call::MESSAGE) {
+    invalid_framework_to_executor_messages++;
+  }
+
+  // TODO(gkleiman): Increment other metrics when we add counters for all
+  // the different types of scheduler calls. See MESOS-8533.
+}
+
+
 void Metrics::incrementTasksStates(
     const TaskState& state,
     const TaskStatus::Source& source,

http://git-wip-us.apache.org/repos/asf/mesos/blob/36aa75bd/src/master/metrics.hpp
----------------------------------------------------------------------
diff --git a/src/master/metrics.hpp b/src/master/metrics.hpp
index f701efe..b343904 100644
--- a/src/master/metrics.hpp
+++ b/src/master/metrics.hpp
@@ -201,6 +201,8 @@ struct Metrics
   std::vector<process::metrics::Gauge> resources_revocable_used;
   std::vector<process::metrics::Gauge> resources_revocable_percent;
 
+  void incrementInvalidSchedulerCalls(const scheduler::Call& call);
+
   void incrementTasksStates(
       const TaskState& state,
       const TaskStatus::Source& source,


[3/4] mesos git commit: Added test for delayed authorization during operator events.

Posted by gr...@apache.org.
Added test for delayed authorization during operator events.

Until the fix for MESOS-8469, it was possible for the master
operator event stream to drop events, if event-related state in
the master changed in between asynchronous calls.

This patch adds `MasterAPITest.EventAuthorizationDelayed` to
verify the fix for that issue.

Review: https://reviews.apache.org/r/65316/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e815417b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e815417b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e815417b

Branch: refs/heads/master
Commit: e815417b235f9102f7740c55b700af6788bfcabb
Parents: 2776724
Author: Greg Mann <gr...@mesosphere.io>
Authored: Wed Feb 7 14:51:02 2018 -0800
Committer: Greg Mann <gr...@gmail.com>
Committed: Wed Feb 7 18:31:29 2018 -0800

----------------------------------------------------------------------
 src/tests/api_tests.cpp | 244 +++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 244 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/e815417b/src/tests/api_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp
index b639a4b..43517b5 100644
--- a/src/tests/api_tests.cpp
+++ b/src/tests/api_tests.cpp
@@ -103,6 +103,7 @@ using testing::AtMost;
 using testing::DoAll;
 using testing::Eq;
 using testing::Return;
+using testing::Sequence;
 using testing::WithParamInterface;
 
 namespace mesos {
@@ -2559,6 +2560,249 @@ TEST_P(MasterAPITest, EventAuthorizationFiltering)
 }
 
 
+// Operator API events are sent using an asynchronous call chain. When
+// event-related state changes in the master before the authorizer returns, the
+// continuations which actually send the event should still have a consistent
+// view of the master state from the time when the event occurred. This test
+// forces task removal in the master before the authorizer returns in order to
+// verify that events are sent correctly in that case.
+TEST_P(MasterAPITest, EventAuthorizationDelayed)
+{
+  Clock::pause();
+
+  ContentType contentType = GetParam();
+
+  MockAuthorizer authorizer;
+  Try<Owned<cluster::Master>> master = StartMaster(&authorizer);
+  ASSERT_SOME(master);
+
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+  auto executor = std::make_shared<v1::MockHTTPExecutor>();
+
+  ExecutorID executorId = DEFAULT_EXECUTOR_ID;
+  TestContainerizer containerizer(executorId, executor);
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  slave::Flags slaveFlags = MesosTest::CreateSlaveFlags();
+
+  Try<Owned<cluster::Slave>> slave =
+    StartSlave(detector.get(), &containerizer, slaveFlags);
+  ASSERT_SOME(slave);
+
+  Clock::advance(slaveFlags.registration_backoff_factor);
+
+  AWAIT_READY(slaveRegisteredMessage);
+
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
+
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  Future<v1::scheduler::Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers));
+
+  v1::scheduler::TestMesos mesos(
+      master.get()->pid,
+      contentType,
+      scheduler);
+
+  AWAIT_READY(subscribed);
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->offers().empty());
+
+  // Create an event stream after seeing first offer but before a task is
+  // launched. We should see one framework, one agent, and no tasks/executors.
+  v1::master::Call v1Call;
+  v1Call.set_type(v1::master::Call::SUBSCRIBE);
+
+  http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+  headers["Accept"] = stringify(contentType);
+
+  Future<http::Response> response = http::streaming::post(
+      master.get()->pid,
+      "api/v1",
+      headers,
+      serialize(contentType, v1Call),
+      stringify(contentType));
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
+  AWAIT_EXPECT_RESPONSE_HEADER_EQ("chunked", "Transfer-Encoding", response);
+  ASSERT_EQ(http::Response::PIPE, response->type);
+  ASSERT_SOME(response->reader);
+
+  http::Pipe::Reader reader = response->reader.get();
+
+  auto deserializer =
+    lambda::bind(deserialize<v1::master::Event>, contentType, lambda::_1);
+
+  Reader<v1::master::Event> decoder(
+      Decoder<v1::master::Event>(deserializer), reader);
+
+  Future<Result<v1::master::Event>> event = decoder.read();
+  AWAIT_READY(event);
+
+  EXPECT_EQ(v1::master::Event::SUBSCRIBED, event->get().type());
+  const v1::master::Response::GetState& getState =
+    event->get().subscribed().get_state();
+
+  EXPECT_EQ(1, getState.get_frameworks().frameworks_size());
+  EXPECT_EQ(1, getState.get_agents().agents_size());
+  EXPECT_TRUE(getState.get_tasks().tasks().empty());
+  EXPECT_TRUE(getState.get_executors().executors().empty());
+
+  event = decoder.read();
+
+  AWAIT_READY(event);
+
+  EXPECT_EQ(v1::master::Event::HEARTBEAT, event->get().type());
+
+  event = decoder.read();
+  EXPECT_TRUE(event.isPending());
+
+  // When the authorizer is called, return pending futures
+  // that we can satisfy later.
+  Promise<Owned<ObjectApprover>> taskAddedApprover;
+  Promise<Owned<ObjectApprover>> updateRunningApprover;
+  Promise<Owned<ObjectApprover>> updateFinishedApprover;
+
+  Sequence approverSequence;
+
+  // Each event results in 4 calls into the authorizer.
+  // NOTE: This may change when the operator event stream code is refactored
+  // to avoid unnecessary authorizer calls. See MESOS-8475.
+  EXPECT_CALL(authorizer, getObjectApprover(_, _))
+    .Times(4)
+    .InSequence(approverSequence)
+    .WillRepeatedly(Return(taskAddedApprover.future()));
+  EXPECT_CALL(authorizer, getObjectApprover(_, _))
+    .Times(4)
+    .InSequence(approverSequence)
+    .WillRepeatedly(Return(updateRunningApprover.future()));
+  EXPECT_CALL(authorizer, getObjectApprover(_, _))
+    .Times(4)
+    .InSequence(approverSequence)
+    .WillRepeatedly(Return(updateFinishedApprover.future()));
+
+  const v1::Offer& offer = offers->offers(0);
+
+  v1::AgentID slaveId(offer.agent_id());
+
+  v1::FrameworkID frameworkId(subscribed->framework_id());
+
+  EXPECT_CALL(*scheduler, update(_, _))
+    .WillOnce(v1::scheduler::SendAcknowledge(frameworkId, slaveId))
+    .WillOnce(v1::scheduler::SendAcknowledge(frameworkId, slaveId));
+
+  // Capture the acknowledgement messages to the agent so that we can delay the
+  // authorizer until the master has processed the terminal acknowledgement.
+  // NOTE: These calls are in reverse order because they use `EXPECT_CALL` under
+  // the hood, and such expectations are evaluated in reverse order.
+  Future<StatusUpdateAcknowledgementMessage> acknowledgeFinished =
+    FUTURE_PROTOBUF(
+        StatusUpdateAcknowledgementMessage(),
+        master.get()->pid,
+        slave.get()->pid);
+  Future<StatusUpdateAcknowledgementMessage> acknowledgeRunning =
+    FUTURE_PROTOBUF(
+        StatusUpdateAcknowledgementMessage(),
+        master.get()->pid,
+        slave.get()->pid);
+
+  EXPECT_CALL(*executor, connected(_))
+    .WillOnce(v1::executor::SendSubscribe(frameworkId, evolve(executorId)));
+
+  EXPECT_CALL(*executor, subscribed(_, _));
+
+  EXPECT_CALL(*executor, launch(_, _))
+    .WillOnce(v1::executor::SendUpdateFromTask(
+        frameworkId, evolve(executorId), v1::TASK_RUNNING));
+
+  EXPECT_CALL(*executor, acknowledged(_, _))
+    .WillOnce(v1::executor::SendUpdateFromTaskID(
+        frameworkId, evolve(executorId), v1::TASK_FINISHED))
+    .WillOnce(Return());
+
+  TaskInfo task = createTask(devolve(offer), "", executorId);
+
+  mesos.send(v1::createCallAccept(
+      frameworkId,
+      offer,
+      {v1::LAUNCH({evolve(task)})}));
+
+  // Wait until the task has finished and task update acknowledgements
+  // have been processed to allow the authorizer to return.
+  AWAIT_READY(acknowledgeRunning);
+  AWAIT_READY(acknowledgeFinished);
+
+  {
+    taskAddedApprover.set(Owned<ObjectApprover>(new AcceptingObjectApprover()));
+
+    AWAIT_READY(event);
+
+    ASSERT_EQ(v1::master::Event::TASK_ADDED, event->get().type());
+    ASSERT_EQ(evolve(task.task_id()),
+              event->get().task_added().task().task_id());
+  }
+
+  event = decoder.read();
+
+  {
+    updateRunningApprover.set(Owned<ObjectApprover>(
+        new AcceptingObjectApprover()));
+
+    AWAIT_READY(event);
+
+    ASSERT_EQ(v1::master::Event::TASK_UPDATED, event->get().type());
+    ASSERT_EQ(v1::TASK_RUNNING,
+              event->get().task_updated().state());
+    ASSERT_EQ(v1::TASK_RUNNING,
+              event->get().task_updated().status().state());
+    ASSERT_EQ(evolve(task.task_id()),
+              event->get().task_updated().status().task_id());
+  }
+
+  event = decoder.read();
+
+  {
+    updateFinishedApprover.set(Owned<ObjectApprover>(
+        new AcceptingObjectApprover()));
+
+    AWAIT_READY(event);
+
+    ASSERT_EQ(v1::master::Event::TASK_UPDATED, event->get().type());
+    ASSERT_EQ(v1::TASK_FINISHED,
+              event->get().task_updated().state());
+    ASSERT_EQ(v1::TASK_FINISHED,
+              event->get().task_updated().status().state());
+    ASSERT_EQ(evolve(task.task_id()),
+              event->get().task_updated().status().task_id());
+  }
+
+  EXPECT_TRUE(reader.close());
+
+  EXPECT_CALL(authorizer, getObjectApprover(_, _))
+    .WillRepeatedly(Return(Owned<ObjectApprover>(
+        new AcceptingObjectApprover())));
+
+  EXPECT_CALL(*executor, shutdown(_))
+    .Times(AtMost(1));
+
+  EXPECT_CALL(*executor, disconnected(_))
+    .Times(AtMost(1));
+}
+
+
 // This test tries to verify that a client subscribed to the 'api/v1' endpoint
 // can receive `FRAMEWORK_ADDED`, `FRAMEWORK_UPDATED` and 'FRAMEWORK_REMOVED'
 // events.


[2/4] mesos git commit: Improved the validation of `ACKNOWLEDGE_OPERATION_STATUS` calls.

Posted by gr...@apache.org.
Improved the validation of `ACKNOWLEDGE_OPERATION_STATUS` calls.

Review: https://reviews.apache.org/r/65363/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2776724b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2776724b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2776724b

Branch: refs/heads/master
Commit: 2776724b00f4f50d61704e6d503f7791ba1f6773
Parents: 36aa75b
Author: Gaston Kleiman <ga...@mesosphere.io>
Authored: Wed Feb 7 14:21:08 2018 -0800
Committer: Greg Mann <gr...@gmail.com>
Committed: Wed Feb 7 18:31:29 2018 -0800

----------------------------------------------------------------------
 src/master/validation.cpp | 19 +++++++++++++++++--
 1 file changed, 17 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/2776724b/src/master/validation.cpp
----------------------------------------------------------------------
diff --git a/src/master/validation.cpp b/src/master/validation.cpp
index f0b8677..42f767e 100644
--- a/src/master/validation.cpp
+++ b/src/master/validation.cpp
@@ -596,11 +596,26 @@ Option<Error> validate(
         return Error("Expecting 'acknowledge_operation_status' to be present");
       }
 
-      Try<id::UUID> uuid = id::UUID::fromBytes(
-          call.acknowledge_operation_status().uuid());
+      const mesos::scheduler::Call::AcknowledgeOperationStatus& acknowledge =
+        call.acknowledge_operation_status();
+
+      Try<id::UUID> uuid = id::UUID::fromBytes(acknowledge.uuid());
       if (uuid.isError()) {
         return uuid.error();
       }
+
+      // TODO(gkleiman): Revisit this once we introduce support for external
+      // resource providers.
+      if (!acknowledge.has_slave_id()) {
+        return Error("Expecting 'agent_id' to be present");
+      }
+
+      // TODO(gkleiman): Revisit this once agent supports sending status
+      // updates for operations affecting default resources (MESOS-8194).
+      if (!acknowledge.has_resource_provider_id()) {
+        return Error("Expecting 'resource_provider_id' to be present");
+      }
+
       return None();
     }