You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by dm...@apache.org on 2017/02/14 21:34:12 UTC
aurora git commit: Expose task pruning endpoint in aurora_admin.
Useful for scale testing in order to 'clean up' after a test run,
but also useful in production if you have a bad actor inflating the size of
your task index.
Repository: aurora
Updated Branches:
refs/heads/master 40d91feb7 -> 0e9c0864e
Expose task pruning endpoint in aurora_admin. Useful for scale testing in order to 'clean up' after a test run, but also useful in production if you have a bad actor inflating the size of your task index.
Bugs closed: AURORA-1893
Reviewed at https://reviews.apache.org/r/56629/
Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/0e9c0864
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/0e9c0864
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/0e9c0864
Branch: refs/heads/master
Commit: 0e9c0864eb1c8587616b6c10dfe104327b005e94
Parents: 40d91fe
Author: David McLaughlin <da...@dmclaughlin.com>
Authored: Tue Feb 14 13:33:01 2017 -0800
Committer: David McLaughlin <dm...@twitter.com>
Committed: Tue Feb 14 13:33:01 2017 -0800
----------------------------------------------------------------------
RELEASE-NOTES.md | 1 +
.../thrift/org/apache/aurora/gen/api.thrift | 7 +++
.../thrift/SchedulerThriftInterface.java | 28 +++++++++
src/main/python/apache/aurora/admin/admin.py | 32 ++++++++++
.../python/apache/aurora/client/api/__init__.py | 3 +
.../thrift/SchedulerThriftInterfaceTest.java | 63 ++++++++++++++++++++
.../python/apache/aurora/admin/test_admin.py | 44 ++++++++++++++
.../aurora/client/api/test_scheduler_client.py | 6 ++
8 files changed, 184 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aurora/blob/0e9c0864/RELEASE-NOTES.md
----------------------------------------------------------------------
diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md
index 3e98802..ff382ff 100644
--- a/RELEASE-NOTES.md
+++ b/RELEASE-NOTES.md
@@ -4,6 +4,7 @@
### New/updated:
- Add message parameter to `killTasks` RPC.
+- Add prune_tasks endpoint to aurora_admin. See aurora_admin prune_tasks -h for usage information.
0.17.0
======
http://git-wip-us.apache.org/repos/asf/aurora/blob/0e9c0864/api/src/main/thrift/org/apache/aurora/gen/api.thrift
----------------------------------------------------------------------
diff --git a/api/src/main/thrift/org/apache/aurora/gen/api.thrift b/api/src/main/thrift/org/apache/aurora/gen/api.thrift
index 6205c2e..efd4e53 100644
--- a/api/src/main/thrift/org/apache/aurora/gen/api.thrift
+++ b/api/src/main/thrift/org/apache/aurora/gen/api.thrift
@@ -1230,6 +1230,13 @@ service AuroraAdmin extends AuroraSchedulerManager {
/** Tell scheduler to trigger an implicit task reconciliation. */
Response triggerImplicitTaskReconciliation()
+
+ /**
+ * Force prune any (terminal) tasks that match the query. If no statuses are supplied with the
+ * query, it will default to all terminal task states. If statuses are supplied, they must be
+ * terminal states.
+ */
+ Response pruneTasks(1: TaskQuery query)
}
// The name of the header that should be sent to bypass leader redirection in the Scheduler.
http://git-wip-us.apache.org/repos/asf/aurora/blob/0e9c0864/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
index a211483..059fbb8 100644
--- a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
@@ -36,6 +36,7 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import com.google.common.collect.Range;
+import com.google.common.collect.Sets;
import org.apache.aurora.common.stats.StatsProvider;
import org.apache.aurora.gen.ConfigRewrite;
@@ -137,6 +138,7 @@ import static org.apache.aurora.gen.ResponseCode.WARNING;
import static org.apache.aurora.scheduler.base.Numbers.convertRanges;
import static org.apache.aurora.scheduler.base.Numbers.toRanges;
import static org.apache.aurora.scheduler.base.Tasks.ACTIVE_STATES;
+import static org.apache.aurora.scheduler.base.Tasks.TERMINAL_STATES;
import static org.apache.aurora.scheduler.quota.QuotaCheckResult.Result.INSUFFICIENT_QUOTA;
import static org.apache.aurora.scheduler.thrift.Responses.addMessage;
import static org.apache.aurora.scheduler.thrift.Responses.empty;
@@ -1115,6 +1117,32 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin {
}
}
+ @Override
+ public Response pruneTasks(TaskQuery query) throws TException {
+ if (query.isSetStatuses() && query.getStatuses().stream().anyMatch(ACTIVE_STATES::contains)) {
+ return error("Tasks in non-terminal state cannot be pruned.");
+ } else if (!query.isSetStatuses()) {
+ query.setStatuses(TERMINAL_STATES);
+ }
+
+ Iterable<IScheduledTask> tasks = storage.read(storeProvider ->
+ storeProvider.getTaskStore().fetchTasks(Query.arbitrary(query)));
+ // For some reason fetchTasks ignores the offset/limit options of a TaskQuery. So we have to
+ // manually apply the limit here. To be fixed in AURORA-1892.
+ if (query.isSetLimit()) {
+ tasks = Iterables.limit(tasks, query.getLimit());
+ }
+
+ Iterable<String> taskIds = Iterables.transform(
+ tasks,
+ task -> task.getAssignedTask().getTaskId());
+
+ return storage.write(storeProvider -> {
+ stateManager.deleteTasks(storeProvider, Sets.newHashSet(taskIds));
+ return ok();
+ });
+ }
+
@ThriftWorkload
@Override
public Response getJobUpdateSummaries(JobUpdateQuery mutableQuery) throws TException {
http://git-wip-us.apache.org/repos/asf/aurora/blob/0e9c0864/src/main/python/apache/aurora/admin/admin.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/admin/admin.py b/src/main/python/apache/aurora/admin/admin.py
index 070c348..0bb995c 100644
--- a/src/main/python/apache/aurora/admin/admin.py
+++ b/src/main/python/apache/aurora/admin/admin.py
@@ -72,6 +72,38 @@ def make_admin_client_with_options(cluster):
@app.command
+@app.command_option('--states', dest='states', default=None,
+ help='Only match tasks with given state(s).')
+@app.command_option('--role', dest='role', default=None,
+ help='Only match tasks with given role.')
+@app.command_option('--env', dest='environment', default=None,
+ help='Only match tasks with given environment.')
+@app.command_option('--limit', dest='limit', default=None, type=int,
+ help='Limit the number of total tasks to prune.')
+def prune_tasks(args, options):
+ if len(args) == 0:
+ die('Must specify at least cluster.')
+ cluster = args[0]
+
+ t = TaskQuery()
+ if options.states:
+ t.statuses = set(map(ScheduleStatus._NAMES_TO_VALUES.get, options.states.split(',')))
+ if options.role:
+ t.role = options.role
+ if options.environment:
+ t.environment = options.environment
+ if options.limit:
+ t.limit = options.limit
+
+ api = make_admin_client_with_options(cluster)
+ rsp = api.prune_tasks(t)
+ if rsp.responseCode != ResponseCode.OK:
+ die('Failed to prune tasks: %s' % combine_messages(rsp))
+ else:
+ print("Tasks pruned.")
+
+
+@app.command
@app.command_option('--force', dest='force', default=False, action='store_true',
help='Force expensive queries to run.')
@app.command_option('--shards', dest='shards', default=None,
http://git-wip-us.apache.org/repos/asf/aurora/blob/0e9c0864/src/main/python/apache/aurora/client/api/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/__init__.py b/src/main/python/apache/aurora/client/api/__init__.py
index 1250ccd..a4639db 100644
--- a/src/main/python/apache/aurora/client/api/__init__.py
+++ b/src/main/python/apache/aurora/client/api/__init__.py
@@ -343,6 +343,9 @@ class AuroraClientAPI(object):
def snapshot(self):
return self._scheduler_proxy.snapshot()
+ def prune_tasks(self, query):
+ return self._scheduler_proxy.pruneTasks(query)
+
def unsafe_rewrite_config(self, rewrite_request):
return self._scheduler_proxy.rewriteConfigs(rewrite_request)
http://git-wip-us.apache.org/repos/asf/aurora/blob/0e9c0864/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
index 0cdd982..c36abc8 100644
--- a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
@@ -63,6 +63,7 @@ import org.apache.aurora.gen.Range;
import org.apache.aurora.gen.ReadOnlyScheduler;
import org.apache.aurora.gen.ResourceAggregate;
import org.apache.aurora.gen.Response;
+import org.apache.aurora.gen.ResponseCode;
import org.apache.aurora.gen.ResponseDetail;
import org.apache.aurora.gen.Result;
import org.apache.aurora.gen.RewriteConfigsRequest;
@@ -71,12 +72,14 @@ import org.apache.aurora.gen.ScheduledTask;
import org.apache.aurora.gen.StartJobUpdateResult;
import org.apache.aurora.gen.TaskConfig;
import org.apache.aurora.gen.TaskConstraint;
+import org.apache.aurora.gen.TaskQuery;
import org.apache.aurora.gen.ValueConstraint;
import org.apache.aurora.gen.apiConstants;
import org.apache.aurora.scheduler.TaskIdGenerator;
import org.apache.aurora.scheduler.base.JobKeys;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.TaskTestUtil;
+import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.configuration.ConfigurationManager;
import org.apache.aurora.scheduler.configuration.ConfigurationManager.TaskDescriptionException;
import org.apache.aurora.scheduler.configuration.SanitizedConfiguration;
@@ -782,6 +785,66 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
}
@Test
+ public void testPruneTasksRejectsActiveStates() throws Exception {
+ control.replay();
+ Response rsp = thrift.pruneTasks(new TaskQuery().setStatuses(Tasks.ACTIVE_STATES));
+ assertResponse(ResponseCode.ERROR, rsp);
+ }
+
+ @Test
+ public void testPruneTasksRejectsMixedStates() throws Exception {
+ control.replay();
+ Response rsp = thrift.pruneTasks(new TaskQuery().setStatuses(
+ ImmutableSet.of(ScheduleStatus.FINISHED, ScheduleStatus.KILLING)));
+ assertResponse(ResponseCode.ERROR, rsp);
+ }
+
+ @Test
+ public void testPruneTasksAddsDefaultStatuses() throws Exception {
+ storageUtil.expectTaskFetch(
+ Query.arbitrary(new TaskQuery().setStatuses(Tasks.TERMINAL_STATES)),
+ buildScheduledTask());
+ stateManager.deleteTasks(
+ storageUtil.mutableStoreProvider,
+ ImmutableSet.of(buildScheduledTask().getAssignedTask().getTaskId()));
+ control.replay();
+
+ assertOkResponse(thrift.pruneTasks(new TaskQuery()));
+ }
+
+ @Test
+ public void testPruneTasksRespectsScopedTerminalState() throws Exception {
+ storageUtil.expectTaskFetch(
+ Query.arbitrary(new TaskQuery().setStatuses(ImmutableSet.of(ScheduleStatus.FAILED))),
+ buildScheduledTask());
+ stateManager.deleteTasks(
+ storageUtil.mutableStoreProvider,
+ ImmutableSet.of(buildScheduledTask().getAssignedTask().getTaskId()));
+ control.replay();
+
+ assertOkResponse(thrift.pruneTasks(
+ new TaskQuery().setStatuses(ImmutableSet.of(ScheduleStatus.FAILED))));
+ }
+
+ @Test
+ public void testPruneTasksAppliesQueryLimit() throws Exception {
+ TaskQuery query = new TaskQuery().setLimit(3);
+ storageUtil.expectTaskFetch(
+ Query.arbitrary(query.setStatuses(Tasks.TERMINAL_STATES)),
+ buildScheduledTask("a/b/c", "task1"),
+ buildScheduledTask("a/b/c", "task2"),
+ buildScheduledTask("a/b/c", "task3"),
+ buildScheduledTask("a/b/c", "task4"),
+ buildScheduledTask("a/b/c", "task5"));
+ stateManager.deleteTasks(
+ storageUtil.mutableStoreProvider,
+ ImmutableSet.of("task1", "task2", "task3"));
+ control.replay();
+
+ assertOkResponse(thrift.pruneTasks(query));
+ }
+
+ @Test
public void testSetQuota() throws Exception {
ResourceAggregate resourceAggregate = new ResourceAggregate()
.setNumCpus(10)
http://git-wip-us.apache.org/repos/asf/aurora/blob/0e9c0864/src/test/python/apache/aurora/admin/test_admin.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/admin/test_admin.py b/src/test/python/apache/aurora/admin/test_admin.py
index 66abade..ebe89b5 100644
--- a/src/test/python/apache/aurora/admin/test_admin.py
+++ b/src/test/python/apache/aurora/admin/test_admin.py
@@ -19,6 +19,7 @@ from mock import PropertyMock, call, create_autospec, patch
from apache.aurora.admin.admin import (
get_scheduler,
increase_quota,
+ prune_tasks,
query,
reconcile_tasks,
set_quota
@@ -45,6 +46,49 @@ from gen.apache.aurora.api.ttypes import (
)
+class TestPruneCommand(AuroraClientCommandTest):
+ @classmethod
+ def setup_mock_options(cls, states=None, role=None, env=None, limit=None):
+ mock_options = create_autospec(
+ spec=['states', 'role', 'environment', 'limit'],
+ spec_set=False,
+ instance=True)
+
+ mock_options.role = role
+ mock_options.states = states
+ mock_options.environment = env
+ mock_options.limit = limit
+ mock_options.bypass_leader_redirect = False
+
+ return mock_options
+
+ @classmethod
+ def task_query(cls, options):
+ query = TaskQuery(
+ role=options.role,
+ environment=options.environment,
+ limit=options.limit)
+ if options.states:
+ query.statuses = set(map(ScheduleStatus._NAMES_TO_VALUES.get, options.states.split(',')))
+ return query
+
+ def test_prune(self):
+ mock_options = self.setup_mock_options(
+ role='aurora', env='devel', limit=10, states="LOST,FINISHED")
+ with contextlib.nested(
+ patch('twitter.common.app.get_options', return_value=mock_options),
+ patch('apache.aurora.admin.admin.make_admin_client',
+ return_value=create_autospec(spec=AuroraClientAPI)),
+ patch('apache.aurora.admin.admin.CLUSTERS', new=self.TEST_CLUSTERS)
+ ) as (_, mock_make_admin_client, _):
+ api = mock_make_admin_client.return_value
+ api.prune_tasks.return_value = Response(responseCode=ResponseCode.OK)
+
+ prune_tasks(['cluster'], mock_options)
+
+ api.prune_tasks.assert_called_once_with(self.task_query(mock_options))
+
+
class TestQueryCommand(AuroraClientCommandTest):
@classmethod
http://git-wip-us.apache.org/repos/asf/aurora/blob/0e9c0864/src/test/python/apache/aurora/client/api/test_scheduler_client.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/api/test_scheduler_client.py b/src/test/python/apache/aurora/client/api/test_scheduler_client.py
index fab9798..59c651c 100644
--- a/src/test/python/apache/aurora/client/api/test_scheduler_client.py
+++ b/src/test/python/apache/aurora/client/api/test_scheduler_client.py
@@ -282,6 +282,12 @@ class TestSchedulerProxyAdminInjection(TestSchedulerProxyInjection):
self.mox.ReplayAll()
self.make_scheduler_proxy().snapshot()
+ def test_pruneTasks(self):
+ t = TaskQuery()
+ self.mock_thrift_client.pruneTasks(IsA(TaskQuery)).AndReturn(DEFAULT_RESPONSE)
+ self.mox.ReplayAll()
+ self.make_scheduler_proxy().pruneTasks(t)
+
def test_rewriteConfigs(self):
self.mock_thrift_client.rewriteConfigs(
IsA(RewriteConfigsRequest)).AndReturn(DEFAULT_RESPONSE)