You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by dm...@apache.org on 2017/02/14 21:34:12 UTC

aurora git commit: Expose task pruning endpoint in aurora_admin. Useful for scale testing in order to 'clean up' after a test run, but also useful in production if you have a bad actor inflating the size of your task index.

Repository: aurora
Updated Branches:
  refs/heads/master 40d91feb7 -> 0e9c0864e


Expose task pruning endpoint in aurora_admin. Useful for scale testing in order to 'clean up' after a test run, but also useful in production if you have a bad actor inflating the size of your task index.

Bugs closed: AURORA-1893

Reviewed at https://reviews.apache.org/r/56629/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/0e9c0864
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/0e9c0864
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/0e9c0864

Branch: refs/heads/master
Commit: 0e9c0864eb1c8587616b6c10dfe104327b005e94
Parents: 40d91fe
Author: David McLaughlin <da...@dmclaughlin.com>
Authored: Tue Feb 14 13:33:01 2017 -0800
Committer: David McLaughlin <dm...@twitter.com>
Committed: Tue Feb 14 13:33:01 2017 -0800

----------------------------------------------------------------------
 RELEASE-NOTES.md                                |  1 +
 .../thrift/org/apache/aurora/gen/api.thrift     |  7 +++
 .../thrift/SchedulerThriftInterface.java        | 28 +++++++++
 src/main/python/apache/aurora/admin/admin.py    | 32 ++++++++++
 .../python/apache/aurora/client/api/__init__.py |  3 +
 .../thrift/SchedulerThriftInterfaceTest.java    | 63 ++++++++++++++++++++
 .../python/apache/aurora/admin/test_admin.py    | 44 ++++++++++++++
 .../aurora/client/api/test_scheduler_client.py  |  6 ++
 8 files changed, 184 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/0e9c0864/RELEASE-NOTES.md
----------------------------------------------------------------------
diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md
index 3e98802..ff382ff 100644
--- a/RELEASE-NOTES.md
+++ b/RELEASE-NOTES.md
@@ -4,6 +4,7 @@
 ### New/updated:
 
 - Add message parameter to `killTasks` RPC.
+- Add prune_tasks endpoint to aurora_admin. See aurora_admin prune_tasks -h for usage information.
 
 0.17.0
 ======

http://git-wip-us.apache.org/repos/asf/aurora/blob/0e9c0864/api/src/main/thrift/org/apache/aurora/gen/api.thrift
----------------------------------------------------------------------
diff --git a/api/src/main/thrift/org/apache/aurora/gen/api.thrift b/api/src/main/thrift/org/apache/aurora/gen/api.thrift
index 6205c2e..efd4e53 100644
--- a/api/src/main/thrift/org/apache/aurora/gen/api.thrift
+++ b/api/src/main/thrift/org/apache/aurora/gen/api.thrift
@@ -1230,6 +1230,13 @@ service AuroraAdmin extends AuroraSchedulerManager {
 
   /** Tell scheduler to trigger an implicit task reconciliation. */
   Response triggerImplicitTaskReconciliation()
+
+  /**
+   * Force prune any (terminal) tasks that match the query. If no statuses are supplied with the
+   * query, it will default to all terminal task states. If statuses are supplied, they must be
+   * terminal states.
+   */
+  Response pruneTasks(1: TaskQuery query)
 }
 
 // The name of the header that should be sent to bypass leader redirection in the Scheduler.

http://git-wip-us.apache.org/repos/asf/aurora/blob/0e9c0864/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
index a211483..059fbb8 100644
--- a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
@@ -36,6 +36,7 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Multimaps;
 import com.google.common.collect.Range;
+import com.google.common.collect.Sets;
 
 import org.apache.aurora.common.stats.StatsProvider;
 import org.apache.aurora.gen.ConfigRewrite;
@@ -137,6 +138,7 @@ import static org.apache.aurora.gen.ResponseCode.WARNING;
 import static org.apache.aurora.scheduler.base.Numbers.convertRanges;
 import static org.apache.aurora.scheduler.base.Numbers.toRanges;
 import static org.apache.aurora.scheduler.base.Tasks.ACTIVE_STATES;
+import static org.apache.aurora.scheduler.base.Tasks.TERMINAL_STATES;
 import static org.apache.aurora.scheduler.quota.QuotaCheckResult.Result.INSUFFICIENT_QUOTA;
 import static org.apache.aurora.scheduler.thrift.Responses.addMessage;
 import static org.apache.aurora.scheduler.thrift.Responses.empty;
@@ -1115,6 +1117,32 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin {
     }
   }
 
+  @Override
+  public Response pruneTasks(TaskQuery query) throws TException {
+    if (query.isSetStatuses() && query.getStatuses().stream().anyMatch(ACTIVE_STATES::contains)) {
+      return error("Tasks in non-terminal state cannot be pruned.");
+    } else if (!query.isSetStatuses()) {
+      query.setStatuses(TERMINAL_STATES);
+    }
+
+    Iterable<IScheduledTask> tasks = storage.read(storeProvider ->
+        storeProvider.getTaskStore().fetchTasks(Query.arbitrary(query)));
+    // For some reason fetchTasks ignores the offset/limit options of a TaskQuery. So we have to
+    // manually apply the limit here. To be fixed in AURORA-1892.
+    if (query.isSetLimit()) {
+      tasks = Iterables.limit(tasks, query.getLimit());
+    }
+
+    Iterable<String> taskIds = Iterables.transform(
+        tasks,
+        task -> task.getAssignedTask().getTaskId());
+
+    return storage.write(storeProvider -> {
+      stateManager.deleteTasks(storeProvider, Sets.newHashSet(taskIds));
+      return ok();
+    });
+  }
+
   @ThriftWorkload
   @Override
   public Response getJobUpdateSummaries(JobUpdateQuery mutableQuery) throws TException {

http://git-wip-us.apache.org/repos/asf/aurora/blob/0e9c0864/src/main/python/apache/aurora/admin/admin.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/admin/admin.py b/src/main/python/apache/aurora/admin/admin.py
index 070c348..0bb995c 100644
--- a/src/main/python/apache/aurora/admin/admin.py
+++ b/src/main/python/apache/aurora/admin/admin.py
@@ -72,6 +72,38 @@ def make_admin_client_with_options(cluster):
 
 
 @app.command
+@app.command_option('--states', dest='states', default=None,
+    help='Only match tasks with given state(s).')
+@app.command_option('--role', dest='role', default=None,
+    help='Only match tasks with given role.')
+@app.command_option('--env', dest='environment', default=None,
+    help='Only match tasks with given environment.')
+@app.command_option('--limit', dest='limit', default=None, type=int,
+    help='Limit the number of total tasks to prune.')
+def prune_tasks(args, options):
+  if len(args) == 0:
+    die('Must specify at least cluster.')
+  cluster = args[0]
+
+  t = TaskQuery()
+  if options.states:
+    t.statuses = set(map(ScheduleStatus._NAMES_TO_VALUES.get, options.states.split(',')))
+  if options.role:
+    t.role = options.role
+  if options.environment:
+    t.environment = options.environment
+  if options.limit:
+    t.limit = options.limit
+
+  api = make_admin_client_with_options(cluster)
+  rsp = api.prune_tasks(t)
+  if rsp.responseCode != ResponseCode.OK:
+    die('Failed to prune tasks: %s' % combine_messages(rsp))
+  else:
+    print("Tasks pruned.")
+
+
+@app.command
 @app.command_option('--force', dest='force', default=False, action='store_true',
     help='Force expensive queries to run.')
 @app.command_option('--shards', dest='shards', default=None,

http://git-wip-us.apache.org/repos/asf/aurora/blob/0e9c0864/src/main/python/apache/aurora/client/api/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/__init__.py b/src/main/python/apache/aurora/client/api/__init__.py
index 1250ccd..a4639db 100644
--- a/src/main/python/apache/aurora/client/api/__init__.py
+++ b/src/main/python/apache/aurora/client/api/__init__.py
@@ -343,6 +343,9 @@ class AuroraClientAPI(object):
   def snapshot(self):
     return self._scheduler_proxy.snapshot()
 
+  def prune_tasks(self, query):
+    return self._scheduler_proxy.pruneTasks(query)
+
   def unsafe_rewrite_config(self, rewrite_request):
     return self._scheduler_proxy.rewriteConfigs(rewrite_request)
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/0e9c0864/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
index 0cdd982..c36abc8 100644
--- a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
@@ -63,6 +63,7 @@ import org.apache.aurora.gen.Range;
 import org.apache.aurora.gen.ReadOnlyScheduler;
 import org.apache.aurora.gen.ResourceAggregate;
 import org.apache.aurora.gen.Response;
+import org.apache.aurora.gen.ResponseCode;
 import org.apache.aurora.gen.ResponseDetail;
 import org.apache.aurora.gen.Result;
 import org.apache.aurora.gen.RewriteConfigsRequest;
@@ -71,12 +72,14 @@ import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.StartJobUpdateResult;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.gen.TaskConstraint;
+import org.apache.aurora.gen.TaskQuery;
 import org.apache.aurora.gen.ValueConstraint;
 import org.apache.aurora.gen.apiConstants;
 import org.apache.aurora.scheduler.TaskIdGenerator;
 import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.TaskTestUtil;
+import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager.TaskDescriptionException;
 import org.apache.aurora.scheduler.configuration.SanitizedConfiguration;
@@ -782,6 +785,66 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
   }
 
   @Test
+  public void testPruneTasksRejectsActiveStates() throws Exception {
+    control.replay();
+    Response rsp = thrift.pruneTasks(new TaskQuery().setStatuses(Tasks.ACTIVE_STATES));
+    assertResponse(ResponseCode.ERROR, rsp);
+  }
+
+  @Test
+  public void testPruneTasksRejectsMixedStates() throws Exception {
+    control.replay();
+    Response rsp = thrift.pruneTasks(new TaskQuery().setStatuses(
+        ImmutableSet.of(ScheduleStatus.FINISHED, ScheduleStatus.KILLING)));
+    assertResponse(ResponseCode.ERROR, rsp);
+  }
+
+  @Test
+  public void testPruneTasksAddsDefaultStatuses() throws Exception {
+    storageUtil.expectTaskFetch(
+        Query.arbitrary(new TaskQuery().setStatuses(Tasks.TERMINAL_STATES)),
+        buildScheduledTask());
+    stateManager.deleteTasks(
+        storageUtil.mutableStoreProvider,
+        ImmutableSet.of(buildScheduledTask().getAssignedTask().getTaskId()));
+    control.replay();
+
+    assertOkResponse(thrift.pruneTasks(new TaskQuery()));
+  }
+
+  @Test
+  public void testPruneTasksRespectsScopedTerminalState() throws Exception {
+    storageUtil.expectTaskFetch(
+        Query.arbitrary(new TaskQuery().setStatuses(ImmutableSet.of(ScheduleStatus.FAILED))),
+        buildScheduledTask());
+    stateManager.deleteTasks(
+        storageUtil.mutableStoreProvider,
+        ImmutableSet.of(buildScheduledTask().getAssignedTask().getTaskId()));
+    control.replay();
+
+    assertOkResponse(thrift.pruneTasks(
+        new TaskQuery().setStatuses(ImmutableSet.of(ScheduleStatus.FAILED))));
+  }
+
+  @Test
+  public void testPruneTasksAppliesQueryLimit() throws Exception {
+    TaskQuery query = new TaskQuery().setLimit(3);
+    storageUtil.expectTaskFetch(
+        Query.arbitrary(query.setStatuses(Tasks.TERMINAL_STATES)),
+        buildScheduledTask("a/b/c", "task1"),
+        buildScheduledTask("a/b/c", "task2"),
+        buildScheduledTask("a/b/c", "task3"),
+        buildScheduledTask("a/b/c", "task4"),
+        buildScheduledTask("a/b/c", "task5"));
+    stateManager.deleteTasks(
+        storageUtil.mutableStoreProvider,
+        ImmutableSet.of("task1", "task2", "task3"));
+    control.replay();
+
+    assertOkResponse(thrift.pruneTasks(query));
+  }
+
+  @Test
   public void testSetQuota() throws Exception {
     ResourceAggregate resourceAggregate = new ResourceAggregate()
         .setNumCpus(10)

http://git-wip-us.apache.org/repos/asf/aurora/blob/0e9c0864/src/test/python/apache/aurora/admin/test_admin.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/admin/test_admin.py b/src/test/python/apache/aurora/admin/test_admin.py
index 66abade..ebe89b5 100644
--- a/src/test/python/apache/aurora/admin/test_admin.py
+++ b/src/test/python/apache/aurora/admin/test_admin.py
@@ -19,6 +19,7 @@ from mock import PropertyMock, call, create_autospec, patch
 from apache.aurora.admin.admin import (
     get_scheduler,
     increase_quota,
+    prune_tasks,
     query,
     reconcile_tasks,
     set_quota
@@ -45,6 +46,49 @@ from gen.apache.aurora.api.ttypes import (
 )
 
 
+class TestPruneCommand(AuroraClientCommandTest):
+  @classmethod
+  def setup_mock_options(cls, states=None, role=None, env=None, limit=None):
+    mock_options = create_autospec(
+        spec=['states', 'role', 'environment', 'limit'],
+        spec_set=False,
+        instance=True)
+
+    mock_options.role = role
+    mock_options.states = states
+    mock_options.environment = env
+    mock_options.limit = limit
+    mock_options.bypass_leader_redirect = False
+
+    return mock_options
+
+  @classmethod
+  def task_query(cls, options):
+    query = TaskQuery(
+        role=options.role,
+        environment=options.environment,
+        limit=options.limit)
+    if options.states:
+      query.statuses = set(map(ScheduleStatus._NAMES_TO_VALUES.get, options.states.split(',')))
+    return query
+
+  def test_prune(self):
+    mock_options = self.setup_mock_options(
+      role='aurora', env='devel', limit=10, states="LOST,FINISHED")
+    with contextlib.nested(
+        patch('twitter.common.app.get_options', return_value=mock_options),
+        patch('apache.aurora.admin.admin.make_admin_client',
+            return_value=create_autospec(spec=AuroraClientAPI)),
+        patch('apache.aurora.admin.admin.CLUSTERS', new=self.TEST_CLUSTERS)
+    ) as (_, mock_make_admin_client, _):
+      api = mock_make_admin_client.return_value
+      api.prune_tasks.return_value = Response(responseCode=ResponseCode.OK)
+
+      prune_tasks(['cluster'], mock_options)
+
+      api.prune_tasks.assert_called_once_with(self.task_query(mock_options))
+
+
 class TestQueryCommand(AuroraClientCommandTest):
 
   @classmethod

http://git-wip-us.apache.org/repos/asf/aurora/blob/0e9c0864/src/test/python/apache/aurora/client/api/test_scheduler_client.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/api/test_scheduler_client.py b/src/test/python/apache/aurora/client/api/test_scheduler_client.py
index fab9798..59c651c 100644
--- a/src/test/python/apache/aurora/client/api/test_scheduler_client.py
+++ b/src/test/python/apache/aurora/client/api/test_scheduler_client.py
@@ -282,6 +282,12 @@ class TestSchedulerProxyAdminInjection(TestSchedulerProxyInjection):
     self.mox.ReplayAll()
     self.make_scheduler_proxy().snapshot()
 
+  def test_pruneTasks(self):
+    t = TaskQuery()
+    self.mock_thrift_client.pruneTasks(IsA(TaskQuery)).AndReturn(DEFAULT_RESPONSE)
+    self.mox.ReplayAll()
+    self.make_scheduler_proxy().pruneTasks(t)
+
   def test_rewriteConfigs(self):
     self.mock_thrift_client.rewriteConfigs(
         IsA(RewriteConfigsRequest)).AndReturn(DEFAULT_RESPONSE)