You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by gr...@apache.org on 2020/05/07 01:06:22 UTC
[mesos] branch 1.9.x updated: Fixed a bug in the agent's draining
handler.
This is an automated email from the ASF dual-hosted git repository.
grag pushed a commit to branch 1.9.x
in repository https://gitbox.apache.org/repos/asf/mesos.git
The following commit(s) were added to refs/heads/1.9.x by this push:
new 35912a2 Fixed a bug in the agent's draining handler.
35912a2 is described below
commit 35912a22081e88ba243d2b690667dff6a90c51d0
Author: Greg Mann <gr...@mesosphere.io>
AuthorDate: Wed May 6 16:35:19 2020 -0700
Fixed a bug in the agent's draining handler.
Previously, when the agent had no tasks or operations and
received a `DrainSlaveMessage`, it would checkpoint the
`DrainConfig` to disk, implicitly placing it into a "draining"
state indefinitely. This patch updates the agent's handler to
avoid checkpointing anything to disk in this case.
The `SlaveTest.DrainInfoInAPIOutputs` test is also removed
and its functionality is moved into the test
`SlaveTest.DrainAgentKillsRunningTask`. The running task in
the latter test allows us to verify agent API outputs both
before and after the task's terminal update is acknowleged.
Review: https://reviews.apache.org/r/72368/
---
src/slave/slave.cpp | 12 +++
src/tests/slave_tests.cpp | 215 +++++++++++++++++++++++++---------------------
2 files changed, 127 insertions(+), 100 deletions(-)
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 23d2ddd..7110ff4 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -998,6 +998,18 @@ void Slave::drain(
const UPID& from,
DrainSlaveMessage&& drainSlaveMessage)
{
+ if (operations.empty() && frameworks.empty()) {
+ LOG(INFO)
+ << "Received DrainConfig " << drainSlaveMessage.config()
+ << (drainConfig.isSome()
+ ? "; previously stored DrainConfig " + stringify(*drainConfig)
+ : "")
+ << "; agent has no stored frameworks, tasks, or operations,"
+ " so draining is already complete";
+
+ return;
+ }
+
hashmap<FrameworkID, hashset<TaskID>> pendingTaskIds;
foreachvalue (Framework* framework, frameworks) {
foreachvalue (const auto& taskMap, framework->pendingTasks) {
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index c147bfc..335a1c4 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -11928,97 +11928,8 @@ TEST_F(
}
-// When the agent receives a `DrainSlaveMessage` from the master, the agent's
-// drain info should be visible in the agent's API output.
-TEST_F(SlaveTest, DrainInfoInAPIOutputs)
-{
- Clock::pause();
-
- const int GRACE_PERIOD_NANOS = 1000000;
-
- Try<Owned<cluster::Master>> master = StartMaster();
- ASSERT_SOME(master);
-
- Future<SlaveRegisteredMessage> slaveRegisteredMessage =
- FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
-
- StandaloneMasterDetector detector(master.get()->pid);
-
- slave::Flags slaveFlags = CreateSlaveFlags();
-
- Try<Owned<cluster::Slave>> slave = StartSlave(&detector, slaveFlags);
- ASSERT_SOME(slave);
-
- Clock::advance(slaveFlags.registration_backoff_factor);
-
- AWAIT_READY(slaveRegisteredMessage);
-
- // Simulate the master sending a `DrainSlaveMessage` to the agent.
- DurationInfo maxGracePeriod;
- maxGracePeriod.set_nanoseconds(GRACE_PERIOD_NANOS);
-
- DrainConfig drainConfig;
- drainConfig.set_mark_gone(true);
- drainConfig.mutable_max_grace_period()->CopyFrom(maxGracePeriod);
-
- DrainSlaveMessage drainSlaveMessage;
- drainSlaveMessage.mutable_config()->CopyFrom(drainConfig);
-
- process::post(master.get()->pid, slave.get()->pid, drainSlaveMessage);
-
- Clock::settle();
-
- {
- v1::agent::Call call;
- call.set_type(v1::agent::Call::GET_AGENT);
-
- const ContentType contentType = ContentType::PROTOBUF;
-
- process::http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
- headers["Accept"] = stringify(contentType);
-
- Future<process::http::Response> httpResponse =
- process::http::post(
- slave.get()->pid,
- "api/v1",
- headers,
- serialize(contentType, call),
- stringify(contentType));
-
- AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, httpResponse);
-
- Future<v1::agent::Response> responseMessage =
- deserialize<v1::agent::Response>(contentType, httpResponse->body);
-
- AWAIT_READY(responseMessage);
- ASSERT_TRUE(responseMessage->IsInitialized());
- ASSERT_EQ(v1::agent::Response::GET_AGENT, responseMessage->type());
- ASSERT_TRUE(responseMessage->get_agent().has_drain_config());
- EXPECT_EQ(
- drainConfig,
- devolve(responseMessage->get_agent().drain_config()));
- }
-
- {
- Future<Response> response = process::http::get(
- slave.get()->pid,
- "state",
- None(),
- createBasicAuthHeaders(DEFAULT_CREDENTIAL));
-
- AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
- AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", response);
-
- Try<JSON::Object> state = JSON::parse<JSON::Object>(response->body);
-
- ASSERT_SOME(state);
-
- EXPECT_EQ(JSON::protobuf(drainConfig), state->values["drain_config"]);
- }
-}
-
-
// When an agent receives a `DrainSlaveMessage`, it should kill running tasks.
+// Agent API outputs related to draining are also verified.
TEST_F(SlaveTest, DrainAgentKillsRunningTask)
{
Clock::pause();
@@ -12033,23 +11944,19 @@ TEST_F(SlaveTest, DrainAgentKillsRunningTask)
slave::Flags slaveFlags = CreateSlaveFlags();
- Try<Owned<cluster::Slave>> slave = StartSlave(&detector, slaveFlags);
+ Try<Owned<cluster::Slave>> slave = StartSlave(&detector, slaveFlags, true);
ASSERT_SOME(slave);
+ slave.get()->start();
+
Clock::advance(slaveFlags.registration_backoff_factor);
AWAIT_READY(updateSlaveMessage);
- // Set the partition-aware capability to ensure that the terminal update state
- // is TASK_GONE_BY_OPERATOR, since we will set `mark_gone = true`.
- v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
- frameworkInfo.add_capabilities()->set_type(
- v1::FrameworkInfo::Capability::PARTITION_AWARE);
-
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
EXPECT_CALL(*scheduler, connected(_))
- .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
+ .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
Future<v1::scheduler::Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
@@ -12119,7 +12026,6 @@ TEST_F(SlaveTest, DrainAgentKillsRunningTask)
maxGracePeriod.set_nanoseconds(0);
DrainConfig drainConfig;
- drainConfig.set_mark_gone(true);
drainConfig.mutable_max_grace_period()->CopyFrom(maxGracePeriod);
DrainSlaveMessage drainSlaveMessage;
@@ -12129,9 +12035,118 @@ TEST_F(SlaveTest, DrainAgentKillsRunningTask)
AWAIT_READY(killedUpdate);
- EXPECT_EQ(v1::TASK_GONE_BY_OPERATOR, killedUpdate->status().state());
+ EXPECT_EQ(v1::TASK_KILLED, killedUpdate->status().state());
EXPECT_EQ(
v1::TaskStatus::REASON_AGENT_DRAINING, killedUpdate->status().reason());
+
+ // Since the scheduler has not acknowledged the terminal task status update,
+ // the agent should still be in the draining state. Confirm that its drain
+ // info appears in API outputs.
+ {
+ v1::agent::Call call;
+ call.set_type(v1::agent::Call::GET_AGENT);
+
+ const ContentType contentType = ContentType::PROTOBUF;
+
+ process::http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+ headers["Accept"] = stringify(contentType);
+
+ Future<process::http::Response> httpResponse =
+ process::http::post(
+ slave.get()->pid,
+ "api/v1",
+ headers,
+ serialize(contentType, call),
+ stringify(contentType));
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, httpResponse);
+
+ Future<v1::agent::Response> responseMessage =
+ deserialize<v1::agent::Response>(contentType, httpResponse->body);
+
+ AWAIT_READY(responseMessage);
+ ASSERT_TRUE(responseMessage->get_agent().has_drain_config());
+ EXPECT_EQ(
+ drainConfig,
+ devolve(responseMessage->get_agent().drain_config()));
+ }
+
+ {
+ Future<Response> response = process::http::get(
+ slave.get()->pid,
+ "state",
+ None(),
+ createBasicAuthHeaders(DEFAULT_CREDENTIAL));
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+ AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", response);
+
+ Try<JSON::Object> state = JSON::parse<JSON::Object>(response->body);
+
+ ASSERT_SOME(state);
+
+ EXPECT_EQ(JSON::protobuf(drainConfig), state->values["drain_config"]);
+ }
+
+ // Now acknowledge the terminal update and confirm that the agent's drain info
+ // is gone.
+
+ Future<StatusUpdateAcknowledgementMessage> terminalAcknowledgement =
+ FUTURE_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, _);
+
+ // The agent won't complete draining until the framework has been removed.
+ // Set up an expectation to await on this event.
+ Future<Nothing> removeFramework;
+ EXPECT_CALL(*slave.get()->mock(), removeFramework(_))
+ .WillOnce(DoAll(Invoke(slave.get()->mock(),
+ &MockSlave::unmocked_removeFramework),
+ FutureSatisfy(&removeFramework)));
+
+ {
+ Call call;
+ call.mutable_framework_id()->CopyFrom(frameworkId);
+ call.set_type(Call::ACKNOWLEDGE);
+
+ Call::Acknowledge* acknowledge = call.mutable_acknowledge();
+ acknowledge->mutable_task_id()->CopyFrom(killedUpdate->status().task_id());
+ acknowledge->mutable_agent_id()->CopyFrom(offer.agent_id());
+ acknowledge->set_uuid(killedUpdate->status().uuid());
+
+ mesos.send(call);
+ }
+
+ // Resume the clock so that the timer used by `delay()` in `os::reap()` can
+ // elapse and allow the executor process to be reaped.
+ Clock::resume();
+
+ AWAIT_READY(terminalAcknowledgement);
+ AWAIT_READY(removeFramework);
+
+ {
+ v1::agent::Call call;
+ call.set_type(v1::agent::Call::GET_AGENT);
+
+ const ContentType contentType = ContentType::PROTOBUF;
+
+ process::http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+ headers["Accept"] = stringify(contentType);
+
+ Future<process::http::Response> httpResponse =
+ process::http::post(
+ slave.get()->pid,
+ "api/v1",
+ headers,
+ serialize(contentType, call),
+ stringify(contentType));
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, httpResponse);
+
+ Future<v1::agent::Response> responseMessage =
+ deserialize<v1::agent::Response>(contentType, httpResponse->body);
+
+ AWAIT_READY(responseMessage);
+ ASSERT_FALSE(responseMessage->get_agent().has_drain_config());
+ }
}