You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2014/07/25 05:06:30 UTC
git commit: Refactoring SchedulerCore (restartShards)
Repository: incubator-aurora
Updated Branches:
refs/heads/master 405c6c8ac -> 824f59d02
Refactoring SchedulerCore (restartShards)
Bugs closed: AURORA-94
Reviewed at https://reviews.apache.org/r/23834/
Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/824f59d0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/824f59d0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/824f59d0
Branch: refs/heads/master
Commit: 824f59d02ac6793d70031ffc4db16de4caedcf96
Parents: 405c6c8
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Thu Jul 24 20:06:04 2014 -0700
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Thu Jul 24 20:06:04 2014 -0700
----------------------------------------------------------------------
.../aurora/scheduler/state/SchedulerCore.java | 13 -----
.../scheduler/state/SchedulerCoreImpl.java | 37 --------------
.../thrift/SchedulerThriftInterface.java | 54 ++++++++++++++------
.../state/BaseSchedulerCoreImplTest.java | 42 ---------------
.../thrift/SchedulerThriftInterfaceTest.java | 38 ++++++++++----
5 files changed, 65 insertions(+), 119 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/824f59d0/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java b/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java
index 38ef846..c636fd7 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java
@@ -13,8 +13,6 @@
*/
package org.apache.aurora.scheduler.state;
-import java.util.Set;
-
import com.google.common.collect.ImmutableSet;
import org.apache.aurora.scheduler.base.ScheduleException;
@@ -56,15 +54,4 @@ public interface SchedulerCore {
*/
void addInstances(IJobKey jobKey, ImmutableSet<Integer> instanceIds, ITaskConfig config)
throws ScheduleException;
-
- /**
- * Initiates a restart of shards within an active job.
- *
- * @param jobKey Key of job to be restarted.
- * @param shards Shards to be restarted.
- * @param requestingUser User performing the restart action.
- * @throws ScheduleException If there are no matching active shards.
- */
- void restartShards(IJobKey jobKey, Set<Integer> shards, String requestingUser)
- throws ScheduleException;
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/824f59d0/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java b/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
index f053c72..3dcb1c3 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
@@ -20,7 +20,6 @@ import javax.inject.Inject;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Functions;
-import com.google.common.base.Optional;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
@@ -29,7 +28,6 @@ import com.twitter.common.args.Arg;
import com.twitter.common.args.CmdLine;
import com.twitter.common.args.constraints.Positive;
-import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.scheduler.TaskIdGenerator;
import org.apache.aurora.scheduler.base.JobKeys;
import org.apache.aurora.scheduler.base.Query;
@@ -51,7 +49,6 @@ import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
import static java.util.Objects.requireNonNull;
-import static org.apache.aurora.gen.ScheduleStatus.RESTARTING;
import static org.apache.aurora.scheduler.quota.QuotaCheckResult.Result.INSUFFICIENT_QUOTA;
/**
@@ -198,38 +195,4 @@ class SchedulerCoreImpl implements SchedulerCore {
}
});
}
-
- @Override
- public void restartShards(
- IJobKey jobKey,
- final Set<Integer> shards,
- final String requestingUser) throws ScheduleException {
-
- if (!JobKeys.isValid(jobKey)) {
- throw new ScheduleException("Invalid job key: " + jobKey);
- }
-
- if (shards.isEmpty()) {
- throw new ScheduleException("At least one shard must be specified.");
- }
-
- final Query.Builder query = Query.instanceScoped(jobKey, shards).active();
- storage.write(new MutateWork.NoResult<ScheduleException>() {
- @Override
- protected void execute(MutableStoreProvider storeProvider) throws ScheduleException {
- Set<IScheduledTask> matchingTasks = storeProvider.getTaskStore().fetchTasks(query);
- if (matchingTasks.size() != shards.size()) {
- throw new ScheduleException("Not all requested shards are active.");
- }
- LOG.info("Restarting shards matching " + query);
- for (String taskId : Tasks.ids(matchingTasks)) {
- stateManager.changeState(
- taskId,
- Optional.<ScheduleStatus>absent(),
- RESTARTING,
- Optional.of("Restarted by " + requestingUser));
- }
- }
- });
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/824f59d0/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
index f2ad920..12de5a3 100644
--- a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
@@ -775,35 +775,50 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
@Override
public Response restartShards(
JobKey mutableJobKey,
- Set<Integer> shardIds,
- Lock mutableLock,
+ final Set<Integer> shardIds,
+ @Nullable final Lock mutableLock,
SessionKey session) {
- IJobKey jobKey = JobKeys.assertValid(IJobKey.build(mutableJobKey));
+ final IJobKey jobKey = JobKeys.assertValid(IJobKey.build(mutableJobKey));
checkNotBlank(shardIds);
requireNonNull(session);
+ final Response response = emptyResponse();
- Response response = Util.emptyResponse();
- SessionContext context;
+ final SessionContext context;
try {
context = sessionValidator.checkAuthenticated(session, ImmutableSet.of(jobKey.getRole()));
} catch (AuthFailedException e) {
return addMessage(response, AUTH_FAILED, e);
}
- try {
- lockManager.validateIfLocked(
- ILockKey.build(LockKey.job(jobKey.newBuilder())),
- Optional.fromNullable(mutableLock).transform(ILock.FROM_BUILDER));
- schedulerCore.restartShards(jobKey, shardIds, context.getIdentity());
- response.setResponseCode(OK);
- } catch (LockException e) {
- addMessage(response, LOCK_ERROR, e);
- } catch (ScheduleException e) {
- addMessage(response, INVALID_REQUEST, e);
- }
+ return storage.write(new MutateWork.Quiet<Response>() {
+ @Override
+ public Response apply(MutableStoreProvider storeProvider) {
+ try {
+ lockManager.validateIfLocked(
+ ILockKey.build(LockKey.job(jobKey.newBuilder())),
+ Optional.fromNullable(mutableLock).transform(ILock.FROM_BUILDER));
+ } catch (LockException e) {
+ return addMessage(response, LOCK_ERROR, e);
+ }
- return response;
+ Query.Builder query = Query.instanceScoped(jobKey, shardIds).active();
+ Set<IScheduledTask> matchingTasks = storeProvider.getTaskStore().fetchTasks(query);
+ if (matchingTasks.size() != shardIds.size()) {
+ return addMessage(response, INVALID_REQUEST, "Not all requested shards are active.");
+ }
+
+ LOG.info("Restarting shards matching " + query);
+ for (String taskId : Tasks.ids(matchingTasks)) {
+ stateManager.changeState(
+ taskId,
+ Optional.<ScheduleStatus>absent(),
+ ScheduleStatus.RESTARTING,
+ restartedByMessage(context.getIdentity()));
+ }
+ return response.setResponseCode(OK);
+ }
+ });
}
@Override
@@ -1240,6 +1255,11 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
}
@VisibleForTesting
+ static Optional<String> restartedByMessage(String user) {
+ return Optional.of("Restarted by " + user);
+ }
+
+ @VisibleForTesting
static final String NO_TASKS_TO_KILL_MESSAGE = "No tasks to kill.";
private static Response okEmptyResponse() {
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/824f59d0/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
index 0206d00..fa611a9 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
@@ -78,7 +78,6 @@ import static org.apache.aurora.gen.ScheduleStatus.FINISHED;
import static org.apache.aurora.gen.ScheduleStatus.KILLED;
import static org.apache.aurora.gen.ScheduleStatus.LOST;
import static org.apache.aurora.gen.ScheduleStatus.PENDING;
-import static org.apache.aurora.gen.ScheduleStatus.RESTARTING;
import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
import static org.apache.aurora.gen.ScheduleStatus.STARTING;
import static org.apache.aurora.scheduler.configuration.ConfigurationManager.DEDICATED_ATTRIBUTE;
@@ -500,47 +499,6 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
}
@Test
- public void testRestartShards() throws Exception {
- expectKillTask(2);
- expectTaskNotThrottled().times(2);
-
- control.replay();
- buildScheduler();
-
- scheduler.createJob(makeJob(KEY_A, productionTask().setIsService(true), 6));
- changeStatus(Query.jobScoped(KEY_A), ASSIGNED, RUNNING);
- scheduler.restartShards(KEY_A, ImmutableSet.of(1, 5), OWNER_A.user);
- assertEquals(4, getTasks(Query.unscoped().byStatus(RUNNING)).size());
- assertEquals(2, getTasks(Query.unscoped().byStatus(RESTARTING)).size());
- changeStatus(Query.unscoped().byStatus(RESTARTING), FINISHED);
- assertEquals(2, getTasks(Query.unscoped().byStatus(PENDING)).size());
- }
-
- @Test(expected = ScheduleException.class)
- public void testRestartNonexistentShard() throws Exception {
- expectTaskNotThrottled();
- expectNoCronJob(KEY_A);
-
- control.replay();
- buildScheduler();
-
- scheduler.createJob(makeJob(KEY_A, productionTask().setIsService(true), 1));
- changeStatus(Query.jobScoped(KEY_A), ASSIGNED, FINISHED);
- scheduler.restartShards(KEY_A, ImmutableSet.of(5), OWNER_A.user);
- }
-
- @Test
- public void testRestartPendingShard() throws Exception {
- expect(cronJobManager.hasJob(KEY_A)).andReturn(false);
-
- control.replay();
- buildScheduler();
-
- scheduler.createJob(makeJob(KEY_A, productionTask().setIsService(true), 1));
- scheduler.restartShards(KEY_A, ImmutableSet.of(0), OWNER_A.user);
- }
-
- @Test
public void testAuditMessage() throws Exception {
control.replay();
buildScheduler();
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/824f59d0/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
index b8677ae..d34bd6f 100644
--- a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
@@ -140,6 +140,7 @@ import static org.apache.aurora.gen.apiConstants.DEFAULT_ENVIRONMENT;
import static org.apache.aurora.gen.apiConstants.THRIFT_API_VERSION;
import static org.apache.aurora.scheduler.configuration.ConfigurationManager.DEDICATED_ATTRIBUTE;
import static org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.killedByMessage;
+import static org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.restartedByMessage;
import static org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.transitionMessage;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.eq;
@@ -662,11 +663,19 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
@Test
public void testRestartShards() throws Exception {
- Set<Integer> shards = ImmutableSet.of(1, 6);
+ Set<Integer> shards = ImmutableSet.of(0);
expectAuth(ROLE, true);
lockManager.validateIfLocked(LOCK_KEY, Optional.of(LOCK));
- scheduler.restartShards(JOB_KEY, shards, USER);
+ storageUtil.expectTaskFetch(
+ Query.instanceScoped(JOB_KEY, shards).active(),
+ buildScheduledTask());
+
+ expect(stateManager.changeState(
+ TASK_ID,
+ Optional.<ScheduleStatus>absent(),
+ ScheduleStatus.RESTARTING,
+ restartedByMessage(USER))).andReturn(true);
control.replay();
@@ -675,6 +684,17 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
}
@Test
+ public void testRestartShardsAuthFailure() throws Exception {
+ expectAuth(ROLE, false);
+
+ control.replay();
+
+ assertResponse(
+ AUTH_FAILED,
+ thrift.restartShards(JOB_KEY.newBuilder(), ImmutableSet.of(0), DEFAULT_LOCK, SESSION));
+ }
+
+ @Test
public void testRestartShardsLockCheckFails() throws Exception {
Set<Integer> shards = ImmutableSet.of(1, 6);
@@ -690,20 +710,18 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
}
@Test
- public void testRestartShardsFails() throws Exception {
+ public void testRestartShardsNotFoundTasksFailure() throws Exception {
Set<Integer> shards = ImmutableSet.of(1, 6);
- String message = "Injected.";
expectAuth(ROLE, true);
- lockManager.validateIfLocked(LOCK_KEY, Optional.<ILock>absent());
- scheduler.restartShards(JOB_KEY, shards, USER);
- expectLastCall().andThrow(new ScheduleException(message));
+ lockManager.validateIfLocked(LOCK_KEY, Optional.of(LOCK));
+ storageUtil.expectTaskFetch(Query.instanceScoped(JOB_KEY, shards).active());
control.replay();
- Response resp = thrift.restartShards(JOB_KEY.newBuilder(), shards, DEFAULT_LOCK, SESSION);
- assertResponse(INVALID_REQUEST, resp);
- assertMessageMatches(resp, message);
+ assertResponse(
+ INVALID_REQUEST,
+ thrift.restartShards(JOB_KEY.newBuilder(), shards, LOCK.newBuilder(), SESSION));
}
@Test