You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2014/07/25 05:06:30 UTC

git commit: Refactoring SchedulerCore (restartShards)

Repository: incubator-aurora
Updated Branches:
  refs/heads/master 405c6c8ac -> 824f59d02


Refactoring SchedulerCore (restartShards)

Bugs closed: AURORA-94

Reviewed at https://reviews.apache.org/r/23834/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/824f59d0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/824f59d0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/824f59d0

Branch: refs/heads/master
Commit: 824f59d02ac6793d70031ffc4db16de4caedcf96
Parents: 405c6c8
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Thu Jul 24 20:06:04 2014 -0700
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Thu Jul 24 20:06:04 2014 -0700

----------------------------------------------------------------------
 .../aurora/scheduler/state/SchedulerCore.java   | 13 -----
 .../scheduler/state/SchedulerCoreImpl.java      | 37 --------------
 .../thrift/SchedulerThriftInterface.java        | 54 ++++++++++++++------
 .../state/BaseSchedulerCoreImplTest.java        | 42 ---------------
 .../thrift/SchedulerThriftInterfaceTest.java    | 38 ++++++++++----
 5 files changed, 65 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/824f59d0/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java b/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java
index 38ef846..c636fd7 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/SchedulerCore.java
@@ -13,8 +13,6 @@
  */
 package org.apache.aurora.scheduler.state;
 
-import java.util.Set;
-
 import com.google.common.collect.ImmutableSet;
 
 import org.apache.aurora.scheduler.base.ScheduleException;
@@ -56,15 +54,4 @@ public interface SchedulerCore {
    */
   void addInstances(IJobKey jobKey, ImmutableSet<Integer> instanceIds, ITaskConfig config)
       throws ScheduleException;
-
-  /**
-   * Initiates a restart of shards within an active job.
-   *
-   * @param jobKey Key of job to be restarted.
-   * @param shards Shards to be restarted.
-   * @param requestingUser User performing the restart action.
-   * @throws ScheduleException If there are no matching active shards.
-   */
-  void restartShards(IJobKey jobKey, Set<Integer> shards, String requestingUser)
-      throws ScheduleException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/824f59d0/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java b/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
index f053c72..3dcb1c3 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/SchedulerCoreImpl.java
@@ -20,7 +20,6 @@ import javax.inject.Inject;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Functions;
-import com.google.common.base.Optional;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Maps;
@@ -29,7 +28,6 @@ import com.twitter.common.args.Arg;
 import com.twitter.common.args.CmdLine;
 import com.twitter.common.args.constraints.Positive;
 
-import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.scheduler.TaskIdGenerator;
 import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.Query;
@@ -51,7 +49,6 @@ import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 
 import static java.util.Objects.requireNonNull;
 
-import static org.apache.aurora.gen.ScheduleStatus.RESTARTING;
 import static org.apache.aurora.scheduler.quota.QuotaCheckResult.Result.INSUFFICIENT_QUOTA;
 
 /**
@@ -198,38 +195,4 @@ class SchedulerCoreImpl implements SchedulerCore {
       }
     });
   }
-
-  @Override
-  public void restartShards(
-      IJobKey jobKey,
-      final Set<Integer> shards,
-      final String requestingUser) throws ScheduleException {
-
-    if (!JobKeys.isValid(jobKey)) {
-      throw new ScheduleException("Invalid job key: " + jobKey);
-    }
-
-    if (shards.isEmpty()) {
-      throw new ScheduleException("At least one shard must be specified.");
-    }
-
-    final Query.Builder query = Query.instanceScoped(jobKey, shards).active();
-    storage.write(new MutateWork.NoResult<ScheduleException>() {
-      @Override
-      protected void execute(MutableStoreProvider storeProvider) throws ScheduleException {
-        Set<IScheduledTask> matchingTasks = storeProvider.getTaskStore().fetchTasks(query);
-        if (matchingTasks.size() != shards.size()) {
-          throw new ScheduleException("Not all requested shards are active.");
-        }
-        LOG.info("Restarting shards matching " + query);
-        for (String taskId : Tasks.ids(matchingTasks)) {
-          stateManager.changeState(
-              taskId,
-              Optional.<ScheduleStatus>absent(),
-              RESTARTING,
-              Optional.of("Restarted by " + requestingUser));
-        }
-      }
-    });
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/824f59d0/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
index f2ad920..12de5a3 100644
--- a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
@@ -775,35 +775,50 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
   @Override
   public Response restartShards(
       JobKey mutableJobKey,
-      Set<Integer> shardIds,
-      Lock mutableLock,
+      final Set<Integer> shardIds,
+      @Nullable final Lock mutableLock,
       SessionKey session) {
 
-    IJobKey jobKey = JobKeys.assertValid(IJobKey.build(mutableJobKey));
+    final IJobKey jobKey = JobKeys.assertValid(IJobKey.build(mutableJobKey));
     checkNotBlank(shardIds);
     requireNonNull(session);
+    final Response response = emptyResponse();
 
-    Response response = Util.emptyResponse();
-    SessionContext context;
+    final SessionContext context;
     try {
       context = sessionValidator.checkAuthenticated(session, ImmutableSet.of(jobKey.getRole()));
     } catch (AuthFailedException e) {
       return addMessage(response, AUTH_FAILED, e);
     }
 
-    try {
-      lockManager.validateIfLocked(
-          ILockKey.build(LockKey.job(jobKey.newBuilder())),
-          Optional.fromNullable(mutableLock).transform(ILock.FROM_BUILDER));
-      schedulerCore.restartShards(jobKey, shardIds, context.getIdentity());
-      response.setResponseCode(OK);
-    } catch (LockException e) {
-      addMessage(response, LOCK_ERROR, e);
-    } catch (ScheduleException e) {
-      addMessage(response, INVALID_REQUEST, e);
-    }
+    return storage.write(new MutateWork.Quiet<Response>() {
+      @Override
+      public Response apply(MutableStoreProvider storeProvider) {
+        try {
+          lockManager.validateIfLocked(
+              ILockKey.build(LockKey.job(jobKey.newBuilder())),
+              Optional.fromNullable(mutableLock).transform(ILock.FROM_BUILDER));
+        } catch (LockException e) {
+          return addMessage(response, LOCK_ERROR, e);
+        }
 
-    return response;
+        Query.Builder query = Query.instanceScoped(jobKey, shardIds).active();
+        Set<IScheduledTask> matchingTasks = storeProvider.getTaskStore().fetchTasks(query);
+        if (matchingTasks.size() != shardIds.size()) {
+          return addMessage(response, INVALID_REQUEST, "Not all requested shards are active.");
+        }
+
+        LOG.info("Restarting shards matching " + query);
+        for (String taskId : Tasks.ids(matchingTasks)) {
+          stateManager.changeState(
+              taskId,
+              Optional.<ScheduleStatus>absent(),
+              ScheduleStatus.RESTARTING,
+              restartedByMessage(context.getIdentity()));
+        }
+        return response.setResponseCode(OK);
+      }
+    });
   }
 
   @Override
@@ -1240,6 +1255,11 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
   }
 
   @VisibleForTesting
+  static Optional<String> restartedByMessage(String user) {
+    return Optional.of("Restarted by " + user);
+  }
+
+  @VisibleForTesting
   static final String NO_TASKS_TO_KILL_MESSAGE = "No tasks to kill.";
 
   private static Response okEmptyResponse()  {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/824f59d0/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
index 0206d00..fa611a9 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/BaseSchedulerCoreImplTest.java
@@ -78,7 +78,6 @@ import static org.apache.aurora.gen.ScheduleStatus.FINISHED;
 import static org.apache.aurora.gen.ScheduleStatus.KILLED;
 import static org.apache.aurora.gen.ScheduleStatus.LOST;
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
-import static org.apache.aurora.gen.ScheduleStatus.RESTARTING;
 import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
 import static org.apache.aurora.gen.ScheduleStatus.STARTING;
 import static org.apache.aurora.scheduler.configuration.ConfigurationManager.DEDICATED_ATTRIBUTE;
@@ -500,47 +499,6 @@ public abstract class BaseSchedulerCoreImplTest extends EasyMockTest {
   }
 
   @Test
-  public void testRestartShards() throws Exception {
-    expectKillTask(2);
-    expectTaskNotThrottled().times(2);
-
-    control.replay();
-    buildScheduler();
-
-    scheduler.createJob(makeJob(KEY_A, productionTask().setIsService(true), 6));
-    changeStatus(Query.jobScoped(KEY_A), ASSIGNED, RUNNING);
-    scheduler.restartShards(KEY_A, ImmutableSet.of(1, 5), OWNER_A.user);
-    assertEquals(4, getTasks(Query.unscoped().byStatus(RUNNING)).size());
-    assertEquals(2, getTasks(Query.unscoped().byStatus(RESTARTING)).size());
-    changeStatus(Query.unscoped().byStatus(RESTARTING), FINISHED);
-    assertEquals(2, getTasks(Query.unscoped().byStatus(PENDING)).size());
-  }
-
-  @Test(expected = ScheduleException.class)
-  public void testRestartNonexistentShard() throws Exception {
-    expectTaskNotThrottled();
-    expectNoCronJob(KEY_A);
-
-    control.replay();
-    buildScheduler();
-
-    scheduler.createJob(makeJob(KEY_A, productionTask().setIsService(true), 1));
-    changeStatus(Query.jobScoped(KEY_A), ASSIGNED, FINISHED);
-    scheduler.restartShards(KEY_A, ImmutableSet.of(5), OWNER_A.user);
-  }
-
-  @Test
-  public void testRestartPendingShard() throws Exception {
-    expect(cronJobManager.hasJob(KEY_A)).andReturn(false);
-
-    control.replay();
-    buildScheduler();
-
-    scheduler.createJob(makeJob(KEY_A, productionTask().setIsService(true), 1));
-    scheduler.restartShards(KEY_A, ImmutableSet.of(0), OWNER_A.user);
-  }
-
-  @Test
   public void testAuditMessage() throws Exception {
     control.replay();
     buildScheduler();

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/824f59d0/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
index b8677ae..d34bd6f 100644
--- a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
@@ -140,6 +140,7 @@ import static org.apache.aurora.gen.apiConstants.DEFAULT_ENVIRONMENT;
 import static org.apache.aurora.gen.apiConstants.THRIFT_API_VERSION;
 import static org.apache.aurora.scheduler.configuration.ConfigurationManager.DEDICATED_ATTRIBUTE;
 import static org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.killedByMessage;
+import static org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.restartedByMessage;
 import static org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.transitionMessage;
 import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.eq;
@@ -662,11 +663,19 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
 
   @Test
   public void testRestartShards() throws Exception {
-    Set<Integer> shards = ImmutableSet.of(1, 6);
+    Set<Integer> shards = ImmutableSet.of(0);
 
     expectAuth(ROLE, true);
     lockManager.validateIfLocked(LOCK_KEY, Optional.of(LOCK));
-    scheduler.restartShards(JOB_KEY, shards, USER);
+    storageUtil.expectTaskFetch(
+        Query.instanceScoped(JOB_KEY, shards).active(),
+        buildScheduledTask());
+
+    expect(stateManager.changeState(
+        TASK_ID,
+        Optional.<ScheduleStatus>absent(),
+        ScheduleStatus.RESTARTING,
+        restartedByMessage(USER))).andReturn(true);
 
     control.replay();
 
@@ -675,6 +684,17 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
   }
 
   @Test
+  public void testRestartShardsAuthFailure() throws Exception {
+    expectAuth(ROLE, false);
+
+    control.replay();
+
+    assertResponse(
+        AUTH_FAILED,
+        thrift.restartShards(JOB_KEY.newBuilder(), ImmutableSet.of(0), DEFAULT_LOCK, SESSION));
+  }
+
+  @Test
   public void testRestartShardsLockCheckFails() throws Exception {
     Set<Integer> shards = ImmutableSet.of(1, 6);
 
@@ -690,20 +710,18 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
   }
 
   @Test
-  public void testRestartShardsFails() throws Exception {
+  public void testRestartShardsNotFoundTasksFailure() throws Exception {
     Set<Integer> shards = ImmutableSet.of(1, 6);
 
-    String message = "Injected.";
     expectAuth(ROLE, true);
-    lockManager.validateIfLocked(LOCK_KEY, Optional.<ILock>absent());
-    scheduler.restartShards(JOB_KEY, shards, USER);
-    expectLastCall().andThrow(new ScheduleException(message));
+    lockManager.validateIfLocked(LOCK_KEY, Optional.of(LOCK));
+    storageUtil.expectTaskFetch(Query.instanceScoped(JOB_KEY, shards).active());
 
     control.replay();
 
-    Response resp = thrift.restartShards(JOB_KEY.newBuilder(), shards, DEFAULT_LOCK, SESSION);
-    assertResponse(INVALID_REQUEST, resp);
-    assertMessageMatches(resp, message);
+    assertResponse(
+        INVALID_REQUEST,
+        thrift.restartShards(JOB_KEY.newBuilder(), shards, LOCK.newBuilder(), SESSION));
   }
 
   @Test