You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by gr...@apache.org on 2018/02/08 02:34:43 UTC
[1/4] mesos git commit: Tested reconciliation when operation is
dropped en route to agent.
Repository: mesos
Updated Branches:
refs/heads/master fbe806869 -> 93f48fd52
Tested reconciliation when operation is dropped en route to agent.
This patch adds
'StorageLocalResourceProviderTest.ROOT_ReconcileDroppedOperation'
in order to verify that reconciliation is performed correctly
when an operation is dropped on its way from the master to the
agent.
Review: https://reviews.apache.org/r/65039
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/93f48fd5
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/93f48fd5
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/93f48fd5
Branch: refs/heads/master
Commit: 93f48fd52074154074bfd4b4aa6ff7855034e609
Parents: e815417
Author: Greg Mann <gr...@gmail.com>
Authored: Wed Feb 7 14:53:41 2018 -0800
Committer: Greg Mann <gr...@gmail.com>
Committed: Wed Feb 7 18:31:29 2018 -0800
----------------------------------------------------------------------
.../storage_local_resource_provider_tests.cpp | 219 +++++++++++++++++++
1 file changed, 219 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/93f48fd5/src/tests/storage_local_resource_provider_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp
index fcac4c4..f390308 100644
--- a/src/tests/storage_local_resource_provider_tests.cpp
+++ b/src/tests/storage_local_resource_provider_tests.cpp
@@ -27,6 +27,8 @@
#include "linux/fs.hpp"
+#include "master/detector/standalone.hpp"
+
#include "module/manager.hpp"
#include "slave/container_daemon_process.hpp"
@@ -47,11 +49,13 @@ using std::vector;
using mesos::internal::slave::ContainerDaemonProcess;
using mesos::master::detector::MasterDetector;
+using mesos::master::detector::StandaloneMasterDetector;
using process::Clock;
using process::Future;
using process::Owned;
+using testing::AtMost;
using testing::Sequence;
namespace mesos {
@@ -3048,6 +3052,221 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_Metrics)
prefix + "csi_node_plugin_terminations"));
}
+
+// Master reconciles operations that are missing from a re-registering slave.
+// In this case, the `ApplyOperationMessage` is dropped, so the resource
+// provider should send OPERATION_DROPPED. Operations on agent default
+// resources are also tested here; for such operations, the agent generates the
+// dropped status.
+TEST_F(StorageLocalResourceProviderTest, ROOT_ReconcileDroppedOperation)
+{
+ Clock::pause();
+
+ setupResourceProviderConfig(Bytes(0), "volume1:2GB;volume2:2GB");
+
+ master::Flags masterFlags = CreateMasterFlags();
+ Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+ ASSERT_SOME(master);
+
+ StandaloneMasterDetector detector(master.get()->pid);
+
+ slave::Flags slaveFlags = CreateSlaveFlags();
+ slaveFlags.isolation = "filesystem/linux";
+
+ // Disable HTTP authentication to simplify resource provider interactions.
+ slaveFlags.authenticate_http_readwrite = false;
+
+ // Set the resource provider capability.
+ vector<SlaveInfo::Capability> capabilities = slave::AGENT_CAPABILITIES();
+ SlaveInfo::Capability capability;
+ capability.set_type(SlaveInfo::Capability::RESOURCE_PROVIDER);
+ capabilities.push_back(capability);
+
+ slaveFlags.agent_features = SlaveCapabilities();
+ slaveFlags.agent_features->mutable_capabilities()->CopyFrom(
+ {capabilities.begin(), capabilities.end()});
+
+ slaveFlags.resource_provider_config_dir = resourceProviderConfigDir;
+
+ Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+ FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+ // Since the local resource provider daemon is started after the agent is
+ // registered, it is guaranteed that the agent will send two
+ // `UpdateSlaveMessage`s, where the latter one contains resources from the
+ // storage local resource provider.
+ //
+ // NOTE: The order of the two `FUTURE_PROTOBUF`s is reversed because
+ // Google Mock will search the expectations in reverse order.
+ Future<UpdateSlaveMessage> updateSlave2 =
+ FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+ Future<UpdateSlaveMessage> updateSlave1 =
+ FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+ Try<Owned<cluster::Slave>> slave = StartSlave(&detector, slaveFlags);
+ ASSERT_SOME(slave);
+
+ // Advance the clock to trigger agent registration.
+ Clock::advance(slaveFlags.registration_backoff_factor);
+
+ AWAIT_READY(slaveRegisteredMessage);
+
+ // NOTE: We need to resume the clock so that the resource provider can
+ // periodically check if the CSI endpoint socket has been created by the
+ // plugin container, which runs in another Linux process. Since we do not have
+ // a `Future` linked to the standalone container launch to await on, it is
+ // difficult to accomplish this without resuming the clock.
+ Clock::resume();
+
+ AWAIT_READY(updateSlave2);
+ ASSERT_TRUE(updateSlave2->has_resource_providers());
+ ASSERT_EQ(1, updateSlave2->resource_providers().providers_size());
+
+ Clock::pause();
+
+ FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo.set_roles(0, "storage");
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(&driver, _, _));
+
+ // We are only interested in pre-existing volumes, which have IDs but no
+ // profile. We use pre-existing volumes to make it easy to send multiple
+ // operations on multiple resources.
+ auto isPreExistingVolume = [](const Resource& r) {
+ return r.has_disk() &&
+ r.disk().has_source() &&
+ r.disk().source().has_id() &&
+ !r.disk().source().has_profile();
+ };
+
+ // We use the filter explicitly here so that the resources will not
+ // be filtered for 5 seconds (the default).
+ Filters filters;
+ filters.set_refuse_seconds(0);
+
+ // Decline offers that contain only the agent's default resources.
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillRepeatedly(DeclineOffers(filters));
+
+ Future<vector<Offer>> offersBeforeOperations;
+
+ EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+ isPreExistingVolume)))
+ .WillOnce(FutureArg<1>(&offersBeforeOperations))
+ .WillRepeatedly(DeclineOffers(filters)); // Decline further matching offers.
+
+ driver.start();
+
+ AWAIT_READY(offersBeforeOperations);
+ ASSERT_FALSE(offersBeforeOperations->empty());
+
+ vector<Resource> sources;
+
+ foreach (
+ const Resource& resource,
+ offersBeforeOperations->at(0).resources()) {
+ if (isPreExistingVolume(resource) &&
+ resource.disk().source().type() == Resource::DiskInfo::Source::RAW) {
+ sources.push_back(resource);
+ }
+ }
+
+ ASSERT_EQ(2u, sources.size());
+
+ // Drop one of the operations on the way to the agent.
+ Future<ApplyOperationMessage> applyOperationMessage =
+ DROP_PROTOBUF(ApplyOperationMessage(), _, _);
+
+ // The successful operation will result in a terminal update.
+ Future<UpdateOperationStatusMessage> operationFinishedStatus =
+ FUTURE_PROTOBUF(UpdateOperationStatusMessage(), _, _);
+
+ // Attempt the creation of two volumes.
+ driver.acceptOffers(
+ {offersBeforeOperations->at(0).id()},
+ {CREATE_VOLUME(sources.at(0), Resource::DiskInfo::Source::MOUNT),
+ CREATE_VOLUME(sources.at(1), Resource::DiskInfo::Source::MOUNT)},
+ filters);
+
+ // Ensure that the operations are processed.
+ Clock::settle();
+
+ AWAIT_READY(applyOperationMessage);
+ AWAIT_READY(operationFinishedStatus);
+
+ Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+ FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+ // Observe explicit operation reconciliation between master and agent.
+ Future<ReconcileOperationsMessage> reconcileOperationsMessage =
+ FUTURE_PROTOBUF(ReconcileOperationsMessage(), _, _);
+ Future<UpdateOperationStatusMessage> operationDroppedStatus =
+ FUTURE_PROTOBUF(UpdateOperationStatusMessage(), _, _);
+
+ // The master may send an offer with the agent's resources after the agent
+ // reregisters, but before an `UpdateSlaveMessage` is sent containing the
+ // resource provider's resources. In this case, the offer will be rescinded.
+ EXPECT_CALL(sched, offerRescinded(&driver, _))
+ .Times(AtMost(1));
+
+ // Simulate a spurious master change event (e.g., due to ZooKeeper
+ // expiration) at the slave to force re-registration.
+ detector.appoint(master.get()->pid);
+
+ // Advance the clock to trigger agent registration.
+ Clock::advance(slaveFlags.registration_backoff_factor);
+
+ AWAIT_READY(slaveReregisteredMessage);
+ AWAIT_READY(reconcileOperationsMessage);
+ AWAIT_READY(operationDroppedStatus);
+
+ std::set<OperationState> expectedStates =
+ {OperationState::OPERATION_DROPPED,
+ OperationState::OPERATION_FINISHED};
+
+ std::set<OperationState> observedStates =
+ {operationFinishedStatus->status().state(),
+ operationDroppedStatus->status().state()};
+
+ ASSERT_EQ(expectedStates, observedStates);
+
+ Future<vector<Offer>> offersAfterOperations;
+
+ EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+ isPreExistingVolume)))
+ .WillOnce(FutureArg<1>(&offersAfterOperations));
+
+ // Advance the clock to trigger a batch allocation.
+ Clock::advance(masterFlags.allocation_interval);
+
+ AWAIT_READY(offersAfterOperations);
+ ASSERT_FALSE(offersAfterOperations->empty());
+
+ vector<Resource> converted;
+
+ foreach (const Resource& resource, offersAfterOperations->at(0).resources()) {
+ if (isPreExistingVolume(resource) &&
+ resource.disk().source().type() == Resource::DiskInfo::Source::MOUNT) {
+ converted.push_back(resource);
+ }
+ }
+
+ ASSERT_EQ(1u, converted.size());
+
+ // TODO(greggomann): Add inspection of dropped operation metrics here once
+ // such metrics have been added. See MESOS-8406.
+
+ // Settle the clock to ensure that unexpected messages will cause errors.
+ Clock::settle();
+
+ driver.stop();
+ driver.join();
+}
+
} // namespace tests {
} // namespace internal {
} // namespace mesos {
[4/4] mesos git commit: Added a method to increment invalid scheduler
API call counters.
Posted by gr...@apache.org.
Added a method to increment invalid scheduler API call counters.
Review: https://reviews.apache.org/r/65362/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/36aa75bd
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/36aa75bd
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/36aa75bd
Branch: refs/heads/master
Commit: 36aa75bd0abc7e7d18fd9ff55f26074e000716e0
Parents: fbe8068
Author: Gaston Kleiman <ga...@mesosphere.io>
Authored: Wed Feb 7 14:18:22 2018 -0800
Committer: Greg Mann <gr...@gmail.com>
Committed: Wed Feb 7 18:31:29 2018 -0800
----------------------------------------------------------------------
src/master/http.cpp | 1 +
src/master/master.cpp | 1 +
src/master/metrics.cpp | 14 ++++++++++++++
src/master/metrics.hpp | 2 ++
4 files changed, 18 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/36aa75bd/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index c489b6f..46f2872 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -980,6 +980,7 @@ Future<Response> Master::Http::scheduler(
Option<Error> error = validation::scheduler::call::validate(call, principal);
if (error.isSome()) {
+ master->metrics->incrementInvalidSchedulerCalls(call);
return BadRequest("Failed to validate scheduler::Call: " +
error.get().message);
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/36aa75bd/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index cc2685a..d7d2286 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -2336,6 +2336,7 @@ void Master::receive(
Option<Error> error = validation::scheduler::call::validate(call);
if (error.isSome()) {
+ metrics->incrementInvalidSchedulerCalls(call);
drop(from, call, error->message);
return;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/36aa75bd/src/master/metrics.cpp
----------------------------------------------------------------------
diff --git a/src/master/metrics.cpp b/src/master/metrics.cpp
index 64fc829..b627372 100644
--- a/src/master/metrics.cpp
+++ b/src/master/metrics.cpp
@@ -481,6 +481,20 @@ Metrics::~Metrics()
}
+void Metrics::incrementInvalidSchedulerCalls(const scheduler::Call& call) {
+ if (call.type() == scheduler::Call::ACKNOWLEDGE) {
+ invalid_status_update_acknowledgements++;
+ }
+
+ if (call.type() == scheduler::Call::MESSAGE) {
+ invalid_framework_to_executor_messages++;
+ }
+
+ // TODO(gkleiman): Increment other metrics when we add counters for all
+ // the different types of scheduler calls. See MESOS-8533.
+}
+
+
void Metrics::incrementTasksStates(
const TaskState& state,
const TaskStatus::Source& source,
http://git-wip-us.apache.org/repos/asf/mesos/blob/36aa75bd/src/master/metrics.hpp
----------------------------------------------------------------------
diff --git a/src/master/metrics.hpp b/src/master/metrics.hpp
index f701efe..b343904 100644
--- a/src/master/metrics.hpp
+++ b/src/master/metrics.hpp
@@ -201,6 +201,8 @@ struct Metrics
std::vector<process::metrics::Gauge> resources_revocable_used;
std::vector<process::metrics::Gauge> resources_revocable_percent;
+ void incrementInvalidSchedulerCalls(const scheduler::Call& call);
+
void incrementTasksStates(
const TaskState& state,
const TaskStatus::Source& source,
[3/4] mesos git commit: Added test for delayed authorization during
operator events.
Posted by gr...@apache.org.
Added test for delayed authorization during operator events.
Until the fix for MESOS-8469, it was possible for the master
operator event stream to drop events, if event-related state in
the master changed in between asynchronous calls.
This patch adds `MasterAPITest.EventAuthorizationDelayed` to
verify the fix for that issue.
Review: https://reviews.apache.org/r/65316/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e815417b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e815417b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e815417b
Branch: refs/heads/master
Commit: e815417b235f9102f7740c55b700af6788bfcabb
Parents: 2776724
Author: Greg Mann <gr...@mesosphere.io>
Authored: Wed Feb 7 14:51:02 2018 -0800
Committer: Greg Mann <gr...@gmail.com>
Committed: Wed Feb 7 18:31:29 2018 -0800
----------------------------------------------------------------------
src/tests/api_tests.cpp | 244 +++++++++++++++++++++++++++++++++++++++++++
1 file changed, 244 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/e815417b/src/tests/api_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp
index b639a4b..43517b5 100644
--- a/src/tests/api_tests.cpp
+++ b/src/tests/api_tests.cpp
@@ -103,6 +103,7 @@ using testing::AtMost;
using testing::DoAll;
using testing::Eq;
using testing::Return;
+using testing::Sequence;
using testing::WithParamInterface;
namespace mesos {
@@ -2559,6 +2560,249 @@ TEST_P(MasterAPITest, EventAuthorizationFiltering)
}
+// Operator API events are sent using an asynchronous call chain. When
+// event-related state changes in the master before the authorizer returns, the
+// continuations which actually send the event should still have a consistent
+// view of the master state from the time when the event occurred. This test
+// forces task removal in the master before the authorizer returns in order to
+// verify that events are sent correctly in that case.
+TEST_P(MasterAPITest, EventAuthorizationDelayed)
+{
+ Clock::pause();
+
+ ContentType contentType = GetParam();
+
+ MockAuthorizer authorizer;
+ Try<Owned<cluster::Master>> master = StartMaster(&authorizer);
+ ASSERT_SOME(master);
+
+ auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+ auto executor = std::make_shared<v1::MockHTTPExecutor>();
+
+ ExecutorID executorId = DEFAULT_EXECUTOR_ID;
+ TestContainerizer containerizer(executorId, executor);
+
+ Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+ FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+
+ slave::Flags slaveFlags = MesosTest::CreateSlaveFlags();
+
+ Try<Owned<cluster::Slave>> slave =
+ StartSlave(detector.get(), &containerizer, slaveFlags);
+ ASSERT_SOME(slave);
+
+ Clock::advance(slaveFlags.registration_backoff_factor);
+
+ AWAIT_READY(slaveRegisteredMessage);
+
+ EXPECT_CALL(*scheduler, connected(_))
+ .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
+
+ Future<v1::scheduler::Event::Subscribed> subscribed;
+ EXPECT_CALL(*scheduler, subscribed(_, _))
+ .WillOnce(FutureArg<1>(&subscribed));
+
+ EXPECT_CALL(*scheduler, heartbeat(_))
+ .WillRepeatedly(Return()); // Ignore heartbeats.
+
+ Future<v1::scheduler::Event::Offers> offers;
+ EXPECT_CALL(*scheduler, offers(_, _))
+ .WillOnce(FutureArg<1>(&offers));
+
+ v1::scheduler::TestMesos mesos(
+ master.get()->pid,
+ contentType,
+ scheduler);
+
+ AWAIT_READY(subscribed);
+
+ AWAIT_READY(offers);
+ ASSERT_FALSE(offers->offers().empty());
+
+ // Create an event stream after seeing first offer but before a task is
+ // launched. We should see one framework, one agent, and no tasks/executors.
+ v1::master::Call v1Call;
+ v1Call.set_type(v1::master::Call::SUBSCRIBE);
+
+ http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+ headers["Accept"] = stringify(contentType);
+
+ Future<http::Response> response = http::streaming::post(
+ master.get()->pid,
+ "api/v1",
+ headers,
+ serialize(contentType, v1Call),
+ stringify(contentType));
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
+ AWAIT_EXPECT_RESPONSE_HEADER_EQ("chunked", "Transfer-Encoding", response);
+ ASSERT_EQ(http::Response::PIPE, response->type);
+ ASSERT_SOME(response->reader);
+
+ http::Pipe::Reader reader = response->reader.get();
+
+ auto deserializer =
+ lambda::bind(deserialize<v1::master::Event>, contentType, lambda::_1);
+
+ Reader<v1::master::Event> decoder(
+ Decoder<v1::master::Event>(deserializer), reader);
+
+ Future<Result<v1::master::Event>> event = decoder.read();
+ AWAIT_READY(event);
+
+ EXPECT_EQ(v1::master::Event::SUBSCRIBED, event->get().type());
+ const v1::master::Response::GetState& getState =
+ event->get().subscribed().get_state();
+
+ EXPECT_EQ(1, getState.get_frameworks().frameworks_size());
+ EXPECT_EQ(1, getState.get_agents().agents_size());
+ EXPECT_TRUE(getState.get_tasks().tasks().empty());
+ EXPECT_TRUE(getState.get_executors().executors().empty());
+
+ event = decoder.read();
+
+ AWAIT_READY(event);
+
+ EXPECT_EQ(v1::master::Event::HEARTBEAT, event->get().type());
+
+ event = decoder.read();
+ EXPECT_TRUE(event.isPending());
+
+ // When the authorizer is called, return pending futures
+ // that we can satisfy later.
+ Promise<Owned<ObjectApprover>> taskAddedApprover;
+ Promise<Owned<ObjectApprover>> updateRunningApprover;
+ Promise<Owned<ObjectApprover>> updateFinishedApprover;
+
+ Sequence approverSequence;
+
+ // Each event results in 4 calls into the authorizer.
+ // NOTE: This may change when the operator event stream code is refactored
+ // to avoid unnecessary authorizer calls. See MESOS-8475.
+ EXPECT_CALL(authorizer, getObjectApprover(_, _))
+ .Times(4)
+ .InSequence(approverSequence)
+ .WillRepeatedly(Return(taskAddedApprover.future()));
+ EXPECT_CALL(authorizer, getObjectApprover(_, _))
+ .Times(4)
+ .InSequence(approverSequence)
+ .WillRepeatedly(Return(updateRunningApprover.future()));
+ EXPECT_CALL(authorizer, getObjectApprover(_, _))
+ .Times(4)
+ .InSequence(approverSequence)
+ .WillRepeatedly(Return(updateFinishedApprover.future()));
+
+ const v1::Offer& offer = offers->offers(0);
+
+ v1::AgentID slaveId(offer.agent_id());
+
+ v1::FrameworkID frameworkId(subscribed->framework_id());
+
+ EXPECT_CALL(*scheduler, update(_, _))
+ .WillOnce(v1::scheduler::SendAcknowledge(frameworkId, slaveId))
+ .WillOnce(v1::scheduler::SendAcknowledge(frameworkId, slaveId));
+
+ // Capture the acknowledgement messages to the agent so that we can delay the
+ // authorizer until the master has processed the terminal acknowledgement.
+ // NOTE: These calls are in reverse order because they use `EXPECT_CALL` under
+ // the hood, and such expectations are evaluated in reverse order.
+ Future<StatusUpdateAcknowledgementMessage> acknowledgeFinished =
+ FUTURE_PROTOBUF(
+ StatusUpdateAcknowledgementMessage(),
+ master.get()->pid,
+ slave.get()->pid);
+ Future<StatusUpdateAcknowledgementMessage> acknowledgeRunning =
+ FUTURE_PROTOBUF(
+ StatusUpdateAcknowledgementMessage(),
+ master.get()->pid,
+ slave.get()->pid);
+
+ EXPECT_CALL(*executor, connected(_))
+ .WillOnce(v1::executor::SendSubscribe(frameworkId, evolve(executorId)));
+
+ EXPECT_CALL(*executor, subscribed(_, _));
+
+ EXPECT_CALL(*executor, launch(_, _))
+ .WillOnce(v1::executor::SendUpdateFromTask(
+ frameworkId, evolve(executorId), v1::TASK_RUNNING));
+
+ EXPECT_CALL(*executor, acknowledged(_, _))
+ .WillOnce(v1::executor::SendUpdateFromTaskID(
+ frameworkId, evolve(executorId), v1::TASK_FINISHED))
+ .WillOnce(Return());
+
+ TaskInfo task = createTask(devolve(offer), "", executorId);
+
+ mesos.send(v1::createCallAccept(
+ frameworkId,
+ offer,
+ {v1::LAUNCH({evolve(task)})}));
+
+ // Wait until the task has finished and task update acknowledgements
+ // have been processed to allow the authorizer to return.
+ AWAIT_READY(acknowledgeRunning);
+ AWAIT_READY(acknowledgeFinished);
+
+ {
+ taskAddedApprover.set(Owned<ObjectApprover>(new AcceptingObjectApprover()));
+
+ AWAIT_READY(event);
+
+ ASSERT_EQ(v1::master::Event::TASK_ADDED, event->get().type());
+ ASSERT_EQ(evolve(task.task_id()),
+ event->get().task_added().task().task_id());
+ }
+
+ event = decoder.read();
+
+ {
+ updateRunningApprover.set(Owned<ObjectApprover>(
+ new AcceptingObjectApprover()));
+
+ AWAIT_READY(event);
+
+ ASSERT_EQ(v1::master::Event::TASK_UPDATED, event->get().type());
+ ASSERT_EQ(v1::TASK_RUNNING,
+ event->get().task_updated().state());
+ ASSERT_EQ(v1::TASK_RUNNING,
+ event->get().task_updated().status().state());
+ ASSERT_EQ(evolve(task.task_id()),
+ event->get().task_updated().status().task_id());
+ }
+
+ event = decoder.read();
+
+ {
+ updateFinishedApprover.set(Owned<ObjectApprover>(
+ new AcceptingObjectApprover()));
+
+ AWAIT_READY(event);
+
+ ASSERT_EQ(v1::master::Event::TASK_UPDATED, event->get().type());
+ ASSERT_EQ(v1::TASK_FINISHED,
+ event->get().task_updated().state());
+ ASSERT_EQ(v1::TASK_FINISHED,
+ event->get().task_updated().status().state());
+ ASSERT_EQ(evolve(task.task_id()),
+ event->get().task_updated().status().task_id());
+ }
+
+ EXPECT_TRUE(reader.close());
+
+ EXPECT_CALL(authorizer, getObjectApprover(_, _))
+ .WillRepeatedly(Return(Owned<ObjectApprover>(
+ new AcceptingObjectApprover())));
+
+ EXPECT_CALL(*executor, shutdown(_))
+ .Times(AtMost(1));
+
+ EXPECT_CALL(*executor, disconnected(_))
+ .Times(AtMost(1));
+}
+
+
// This test tries to verify that a client subscribed to the 'api/v1' endpoint
// can receive `FRAMEWORK_ADDED`, `FRAMEWORK_UPDATED` and 'FRAMEWORK_REMOVED'
// events.
[2/4] mesos git commit: Improved the validation of
`ACKNOWLEDGE_OPERATION_STATUS` calls.
Posted by gr...@apache.org.
Improved the validation of `ACKNOWLEDGE_OPERATION_STATUS` calls.
Review: https://reviews.apache.org/r/65363/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2776724b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2776724b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2776724b
Branch: refs/heads/master
Commit: 2776724b00f4f50d61704e6d503f7791ba1f6773
Parents: 36aa75b
Author: Gaston Kleiman <ga...@mesosphere.io>
Authored: Wed Feb 7 14:21:08 2018 -0800
Committer: Greg Mann <gr...@gmail.com>
Committed: Wed Feb 7 18:31:29 2018 -0800
----------------------------------------------------------------------
src/master/validation.cpp | 19 +++++++++++++++++--
1 file changed, 17 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/2776724b/src/master/validation.cpp
----------------------------------------------------------------------
diff --git a/src/master/validation.cpp b/src/master/validation.cpp
index f0b8677..42f767e 100644
--- a/src/master/validation.cpp
+++ b/src/master/validation.cpp
@@ -596,11 +596,26 @@ Option<Error> validate(
return Error("Expecting 'acknowledge_operation_status' to be present");
}
- Try<id::UUID> uuid = id::UUID::fromBytes(
- call.acknowledge_operation_status().uuid());
+ const mesos::scheduler::Call::AcknowledgeOperationStatus& acknowledge =
+ call.acknowledge_operation_status();
+
+ Try<id::UUID> uuid = id::UUID::fromBytes(acknowledge.uuid());
if (uuid.isError()) {
return uuid.error();
}
+
+ // TODO(gkleiman): Revisit this once we introduce support for external
+ // resource providers.
+ if (!acknowledge.has_slave_id()) {
+ return Error("Expecting 'agent_id' to be present");
+ }
+
+ // TODO(gkleiman): Revisit this once agent supports sending status
+ // updates for operations affecting default resources (MESOS-8194).
+ if (!acknowledge.has_resource_provider_id()) {
+ return Error("Expecting 'resource_provider_id' to be present");
+ }
+
return None();
}