You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2014/10/09 20:35:01 UTC

git commit: Fixing quota checking for updates.

Repository: incubator-aurora
Updated Branches:
  refs/heads/master 18ae90569 -> 7bc791cea


Fixing quota checking for updates.

Bugs closed: AURORA-802

Reviewed at https://reviews.apache.org/r/26425/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/7bc791ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/7bc791ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/7bc791ce

Branch: refs/heads/master
Commit: 7bc791cea1a7f26c9cb019225cecfc3a812e8121
Parents: 18ae905
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Thu Oct 9 11:33:47 2014 -0700
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Thu Oct 9 11:33:47 2014 -0700

----------------------------------------------------------------------
 .../aurora/scheduler/quota/QuotaManager.java    | 208 +++++++++++++++----
 .../aurora/scheduler/quota/QuotaUtil.java       | 138 ------------
 .../thrift/SchedulerThriftInterface.java        |  20 +-
 .../scheduler/quota/QuotaManagerImplTest.java   | 187 +++++++++++++----
 .../thrift/SchedulerThriftInterfaceTest.java    |  63 +++---
 5 files changed, 363 insertions(+), 253 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/7bc791ce/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java b/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
index 2442b06..5f08997 100644
--- a/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
@@ -20,11 +20,13 @@ import java.util.Set;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableRangeSet;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Range;
 import com.google.common.collect.RangeSet;
 import com.google.common.collect.Sets;
@@ -49,6 +51,7 @@ import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary;
 import org.apache.aurora.scheduler.storage.entities.IRange;
 import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.apache.aurora.scheduler.updater.JobUpdateController;
 
 import static java.util.Objects.requireNonNull;
@@ -77,14 +80,24 @@ public interface QuotaManager {
   QuotaInfo getQuotaInfo(String role);
 
   /**
-   * Checks if there is enough resource quota available for adding production resources specified
-   * in {@code requestedProdResources}. The quota is defined at the task owner (role) level.
+   * Checks if there is enough resource quota available for adding {@code instances} of
+   * {@code template} tasks provided resources consumed by {@code releasedTemplates} tasks
+   * are released. The quota is defined at the task owner (role) level.
    *
-   * @param role Role to check quota for.
-   * @param requestedProdResources Additional production resources requested.
+   * @param template Task resource requirement.
+   * @param instances Number of additional instances requested.
    * @return {@code QuotaComparisonResult} instance with quota check result details.
    */
-  QuotaCheckResult checkQuota(String role, IResourceAggregate requestedProdResources);
+  QuotaCheckResult checkInstanceAddition(ITaskConfig template, int instances);
+
+  /**
+   * Checks if there is enough resource quota available for performing a job update represented
+   * by the {@code jobUpdate}. The quota is defined at the task owner (role) level.
+   *
+   * @param jobUpdate Job update to check quota for.
+   * @return {@code QuotaComparisonResult} instance with quota check result details.
+   */
+  QuotaCheckResult checkJobUpdate(IJobUpdate jobUpdate);
 
   /**
    * Thrown when quota related operation failed.
@@ -128,6 +141,51 @@ public interface QuotaManager {
 
     @Override
     public QuotaInfo getQuotaInfo(final String role) {
+      return getQuotaInfo(role, Optional.<IJobUpdate>absent());
+    }
+
+    @Override
+    public QuotaCheckResult checkInstanceAddition(ITaskConfig template, int instances) {
+      Preconditions.checkArgument(instances >= 0);
+      if (!template.isProduction()) {
+        return new QuotaCheckResult(SUFFICIENT_QUOTA);
+      }
+
+      QuotaInfo quotaInfo = getQuotaInfo(template.getOwner().getRole());
+
+      return QuotaCheckResult.greaterOrEqual(
+          quotaInfo.getQuota(),
+          add(quotaInfo.getProdConsumption(), ResourceAggregates.scale(
+              prodResourcesFromTasks(ImmutableSet.of(template)), instances)));
+    }
+
+    @Override
+    public QuotaCheckResult checkJobUpdate(IJobUpdate jobUpdate) {
+      requireNonNull(jobUpdate);
+      if (!jobUpdate.getInstructions().isSetDesiredState()
+          || !jobUpdate.getInstructions().getDesiredState().getTask().isProduction()) {
+
+        return new QuotaCheckResult(SUFFICIENT_QUOTA);
+      }
+
+      QuotaInfo quotaInfo = getQuotaInfo(
+          jobUpdate.getSummary().getJobKey().getRole(),
+          Optional.of(jobUpdate));
+
+      return QuotaCheckResult.greaterOrEqual(quotaInfo.getQuota(), quotaInfo.getProdConsumption());
+    }
+
+    /**
+     * Gets QuotaInfo with currently allocated quota and actual consumption data.
+     * <p>
+     * In case an optional {@code requestedUpdate} is specified, the production consumption returned
+     * also includes an estimated resources share of that update as if it was already in progress.
+     *
+     * @param role Role to get quota info for.
+     * @param requestedUpdate An optional {@code IJobUpdate} to forecast the prod consumption.
+     * @return {@code QuotaInfo} with quota and consumption details.
+     */
+    private QuotaInfo getQuotaInfo(final String role, final Optional<IJobUpdate> requestedUpdate) {
       return storage.consistentRead(new Work.Quiet<QuotaInfo>() {
         @Override
         public QuotaInfo apply(StoreProvider storeProvider) {
@@ -135,9 +193,10 @@ public interface QuotaManager {
               storeProvider.getTaskStore().fetchTasks(Query.roleScoped(role).active()));
 
           IResourceAggregate prodConsumed =
-              getProdConsumption(storeProvider.getJobUpdateStore(), role, tasks);
+              getProdConsumption(storeProvider.getJobUpdateStore(), role, tasks, requestedUpdate);
 
-          IResourceAggregate nonProdConsumed = QuotaUtil.fromTasks(
+          // TODO(maxim): Consider a similar update-aware approach for computing nonProdConsumed.
+          IResourceAggregate nonProdConsumed = fromTasks(
               tasks.transform(Tasks.SCHEDULED_TO_INFO).filter(Predicates.not(Tasks.IS_PRODUCTION)));
 
           IResourceAggregate quota =
@@ -148,23 +207,11 @@ public interface QuotaManager {
       });
     }
 
-    @Override
-    public QuotaCheckResult checkQuota(String role, IResourceAggregate requestedProdResources) {
-      if (ResourceAggregates.EMPTY.equals(requestedProdResources)) {
-        return new QuotaCheckResult(SUFFICIENT_QUOTA);
-      }
-
-      QuotaInfo quotaInfo = getQuotaInfo(role);
-
-      return QuotaCheckResult.greaterOrEqual(
-          quotaInfo.getQuota(),
-          add(quotaInfo.getProdConsumption(), requestedProdResources));
-    }
-
     private IResourceAggregate getProdConsumption(
         JobUpdateStore jobUpdateStore,
         String role,
-        FluentIterable<IScheduledTask> tasks) {
+        FluentIterable<IScheduledTask> tasks,
+        Optional<IJobUpdate> requestedUpdate) {
 
       // The algorithm here is as follows:
       // 1. Load all production active tasks that belong to jobs without active updates OR
@@ -173,21 +220,29 @@ public interface QuotaManager {
       //    range (e.g. not in JobUpdateInstructions.updateOnlyTheseInstances).
       //    Calculate consumed resources as "nonUpdateConsumption".
       //
-      // 2. Calculate consumed resources from instances affected by the active job updates as
+      // 2. Mix in a requested job update (if present) to correctly calculate prod consumption.
+      //    This would be an update that is not saved in the store yet (i.e. the one quota is
+      //    checked for).
+      //
+      // 3. Calculate consumed resources from instances affected by the active job updates as
       //    "updateConsumption".
       //
-      // 3. Add up the two to yield total prod consumption.
+      // 4. Add up the two to yield total prod consumption.
 
-      final Map<IJobKey, IJobUpdate> roleJobUpdates =
-          fetchActiveJobUpdates(jobUpdateStore, role).uniqueIndex(UPDATE_TO_JOB_KEY);
+      Map<IJobKey, IJobUpdate> updatesByKey = Maps.newHashMap(
+          fetchActiveJobUpdates(jobUpdateStore, role).uniqueIndex(UPDATE_TO_JOB_KEY));
 
-      IResourceAggregate nonUpdateConsumption = QuotaUtil.prodResourcesFromTasks(tasks
-          .filter(buildNonUpdatingTasksFilter(roleJobUpdates))
+      if (requestedUpdate.isPresent()) {
+        updatesByKey.put(requestedUpdate.get().getSummary().getJobKey(), requestedUpdate.get());
+      }
+
+      IResourceAggregate nonUpdateConsumption = prodResourcesFromTasks(tasks
+          .filter(buildNonUpdatingTasksFilter(updatesByKey))
           .transform(Tasks.SCHEDULED_TO_INFO));
 
       IResourceAggregate updateConsumption = ResourceAggregates.EMPTY;
-      for (IJobUpdate update : roleJobUpdates.values()) {
-        updateConsumption = add(updateConsumption, QuotaUtil.prodResourcesFromJobUpdate(update));
+      for (IJobUpdate update : updatesByKey.values()) {
+        updateConsumption = add(updateConsumption, toProdResources(update.getInstructions()));
       }
 
       return add(nonUpdateConsumption, updateConsumption);
@@ -217,14 +272,6 @@ public interface QuotaManager {
       };
     }
 
-    private static final Function<IJobUpdate, IJobKey> UPDATE_TO_JOB_KEY =
-        new Function<IJobUpdate, IJobKey>() {
-          @Override
-          public IJobKey apply(IJobUpdate input) {
-            return input.getSummary().getJobKey();
-          }
-        };
-
     private static FluentIterable<IJobUpdate> fetchActiveJobUpdates(
         JobUpdateStore jobUpdateStore,
         String role) {
@@ -263,5 +310,92 @@ public interface QuotaManager {
           .setRamMb(a.getRamMb() + b.getRamMb())
           .setDiskMb(a.getDiskMb() + b.getDiskMb()));
     }
+
+    /**
+     * This function calculates max aggregate production resources consumed by the job update
+     * {@code instructions}. The max is calculated between existing and desired task configs on per
+     * resource basis. This means max CPU, RAM and DISK values are computed individually and may
+     * come from different task configurations. While it may not be the most accurate
+     * representation of job update resources during the update, it does guarantee none of the
+     * individual resource values is exceeded during the forward/back roll.
+     *
+     * @param instructions Update instructions with resource definitions.
+     * @return Resources consumed by the update.
+     */
+    private static IResourceAggregate toProdResources(IJobUpdateInstructions instructions) {
+      double existingCpu = 0;
+      int existingRamMb = 0;
+      int existingDiskMb = 0;
+      for (IInstanceTaskConfig group : instructions.getInitialState()) {
+        ITaskConfig task = group.getTask();
+        if (task.isProduction()) {
+          for (IRange range : group.getInstances()) {
+            int numInstances = range.getLast() - range.getFirst() + 1;
+            existingCpu += task.getNumCpus() * numInstances;
+            existingRamMb += task.getRamMb() * numInstances;
+            existingDiskMb += task.getDiskMb() * numInstances;
+          }
+        }
+      }
+
+      // Calculate desired prod task consumption.
+      IResourceAggregate desired = Optional.fromNullable(instructions.getDesiredState())
+          .transform(TO_PROD_RESOURCES).or(ResourceAggregates.EMPTY);
+
+      // Calculate result as max(existing, desired) per resource.
+      return IResourceAggregate.build(new ResourceAggregate()
+          .setNumCpus(Math.max(existingCpu, desired.getNumCpus()))
+          .setRamMb(Math.max(existingRamMb, desired.getRamMb()))
+          .setDiskMb(Math.max(existingDiskMb, desired.getDiskMb())));
+    }
+
+    private static IResourceAggregate prodResourcesFromTasks(Iterable<ITaskConfig> tasks) {
+      return fromTasks(FluentIterable.from(tasks).filter(Tasks.IS_PRODUCTION));
+    }
+
+    private static IResourceAggregate fromTasks(Iterable<ITaskConfig> tasks) {
+      double cpu = 0;
+      int ramMb = 0;
+      int diskMb = 0;
+      for (ITaskConfig task : tasks) {
+        cpu += task.getNumCpus();
+        ramMb += task.getRamMb();
+        diskMb += task.getDiskMb();
+      }
+
+      return IResourceAggregate.build(new ResourceAggregate()
+          .setNumCpus(cpu)
+          .setRamMb(ramMb)
+          .setDiskMb(diskMb));
+    }
+
+    private static final Function<IInstanceTaskConfig, IResourceAggregate> TO_PROD_RESOURCES =
+        new Function<IInstanceTaskConfig, IResourceAggregate>() {
+          @Override
+          public IResourceAggregate apply(IInstanceTaskConfig input) {
+            return input.getTask().isProduction()
+                ? ResourceAggregates.scale(
+                prodResourcesFromTasks(ImmutableSet.of(input.getTask())),
+                getUpdateInstanceCount(input.getInstances()))
+                : ResourceAggregates.EMPTY;
+          }
+        };
+
+    private static final Function<IJobUpdate, IJobKey> UPDATE_TO_JOB_KEY =
+        new Function<IJobUpdate, IJobKey>() {
+          @Override
+          public IJobKey apply(IJobUpdate input) {
+            return input.getSummary().getJobKey();
+          }
+        };
+
+    private static int getUpdateInstanceCount(Set<IRange> ranges) {
+      int instanceCount = 0;
+      for (IRange range : ranges) {
+        instanceCount += range.getLast() - range.getFirst() + 1;
+      }
+
+      return instanceCount;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/7bc791ce/src/main/java/org/apache/aurora/scheduler/quota/QuotaUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/quota/QuotaUtil.java b/src/main/java/org/apache/aurora/scheduler/quota/QuotaUtil.java
deleted file mode 100644
index d197dd5..0000000
--- a/src/main/java/org/apache/aurora/scheduler/quota/QuotaUtil.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.quota;
-
-import java.util.Set;
-
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableSet;
-
-import org.apache.aurora.gen.ResourceAggregate;
-import org.apache.aurora.scheduler.base.ResourceAggregates;
-import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.storage.entities.IInstanceTaskConfig;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateInstructions;
-import org.apache.aurora.scheduler.storage.entities.IRange;
-import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-
-/**
- * Static utility helpers for quota validation.
- */
-public final class QuotaUtil {
-  private QuotaUtil() {
-    // Utility class.
-  }
-
-  /**
-   * Converts a set of production {@link ITaskConfig} templates into a {@link IResourceAggregate}.
-   * <p>
-   * Discards any templates with {@code isProduction = False}.
-   *
-   * @param tasks Set of tasks to convert.
-   * @return Aggregate resources consumed by {@code tasks}.
-   */
-  public static IResourceAggregate prodResourcesFromTasks(Iterable<ITaskConfig> tasks) {
-    return fromTasks(FluentIterable.from(tasks).filter(Tasks.IS_PRODUCTION));
-  }
-
-  /**
-   * Converts a {@link IJobUpdate} into a {@link IResourceAggregate}.
-   * <p>
-   * This function calculates max aggregate production resources consumed by the
-   * {@code jobUpdate}. The max is calculated between existing and desired task configs on per
-   * resource basis. This means max CPU, RAM and DISK values are computed individually and may
-   * come from different task configurations. While it may not be the most accurate representation
-   * of job update resources during the update, it does guarantee none of the individual resource
-   * values is exceeded during the forward/back roll.
-   *
-   * @param jobUpdate Job update to convert.
-   * @return Max production aggregate resources represented by the {@code jobUpdate}.
-   */
-  public static IResourceAggregate prodResourcesFromJobUpdate(IJobUpdate jobUpdate) {
-    IJobUpdateInstructions instructions = jobUpdate.getInstructions();
-
-    // Calculate existing prod task consumption.
-    double existingCpu = 0;
-    int existingRamMb = 0;
-    int existingDiskMb = 0;
-    for (IInstanceTaskConfig group : instructions.getInitialState()) {
-      ITaskConfig task = group.getTask();
-      if (task.isProduction()) {
-        for (IRange range : group.getInstances()) {
-          int numInstances = range.getLast() - range.getFirst() + 1;
-          existingCpu += task.getNumCpus() * numInstances;
-          existingRamMb += task.getRamMb() * numInstances;
-          existingDiskMb += task.getDiskMb() * numInstances;
-        }
-      }
-    }
-
-    // Calculate desired prod task consumption.
-    IResourceAggregate desired = Optional.fromNullable(instructions.getDesiredState())
-        .transform(TO_PROD_RESOURCES).or(ResourceAggregates.EMPTY);
-
-    // Calculate result as max(existing, desired) per resource.
-    return IResourceAggregate.build(new ResourceAggregate()
-        .setNumCpus(Math.max(existingCpu, desired.getNumCpus()))
-        .setRamMb(Math.max(existingRamMb, desired.getRamMb()))
-        .setDiskMb(Math.max(existingDiskMb, desired.getDiskMb())));
-  }
-
-  /**
-   * Converts a set of {@link ITaskConfig} templates into a {@link IResourceAggregate}.
-   * <p>
-   * @param tasks Set of tasks to convert.
-   * @return Aggregate resources consumed by {@code tasks}.
-   */
-  public static IResourceAggregate fromTasks(Iterable<ITaskConfig> tasks) {
-    double cpu = 0;
-    int ramMb = 0;
-    int diskMb = 0;
-    for (ITaskConfig task : tasks) {
-      cpu += task.getNumCpus();
-      ramMb += task.getRamMb();
-      diskMb += task.getDiskMb();
-    }
-
-    return IResourceAggregate.build(new ResourceAggregate()
-        .setNumCpus(cpu)
-        .setRamMb(ramMb)
-        .setDiskMb(diskMb));
-  }
-
-  private static final Function<IInstanceTaskConfig, IResourceAggregate> TO_PROD_RESOURCES =
-      new Function<IInstanceTaskConfig, IResourceAggregate>() {
-        @Override
-        public IResourceAggregate apply(IInstanceTaskConfig input) {
-          return input.getTask().isProduction()
-              ? ResourceAggregates.scale(
-                  prodResourcesFromTasks(ImmutableSet.of(input.getTask())),
-                  getUpdateInstanceCount(input.getInstances()))
-              : ResourceAggregates.EMPTY;
-        }
-      };
-
-  private static int getUpdateInstanceCount(Set<IRange> ranges) {
-    int instanceCount = 0;
-    for (IRange range : ranges) {
-      instanceCount += range.getLast() - range.getFirst() + 1;
-    }
-
-    return instanceCount;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/7bc791ce/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
index 5dcae4a..137f97d 100644
--- a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
@@ -114,7 +114,6 @@ import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.Jobs;
 import org.apache.aurora.scheduler.base.Numbers;
 import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.base.ResourceAggregates;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager.TaskDescriptionException;
@@ -130,7 +129,6 @@ import org.apache.aurora.scheduler.quota.QuotaCheckResult;
 import org.apache.aurora.scheduler.quota.QuotaInfo;
 import org.apache.aurora.scheduler.quota.QuotaManager;
 import org.apache.aurora.scheduler.quota.QuotaManager.QuotaException;
-import org.apache.aurora.scheduler.quota.QuotaUtil;
 import org.apache.aurora.scheduler.state.LockManager;
 import org.apache.aurora.scheduler.state.LockManager.LockException;
 import org.apache.aurora.scheduler.state.MaintenanceController;
@@ -313,9 +311,10 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
           ITaskConfig taskConfig = sanitized.getJobConfig().getTaskConfig();
           int instanceCount = sanitized.getInstanceIds().size();
 
-          validateTaskLimits(taskConfig, instanceCount, ResourceAggregates.scale(
-              QuotaUtil.prodResourcesFromTasks(ImmutableSet.of(taskConfig)),
-              instanceCount));
+          validateTaskLimits(
+              taskConfig,
+              instanceCount,
+              quotaManager.checkInstanceAddition(taskConfig, instanceCount));
 
           // TODO(mchucarroll): deprecate cron as a part of create/kill job.(AURORA-454)
           if (sanitized.isCron()) {
@@ -1205,9 +1204,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
           validateTaskLimits(
               task,
               currentTasks.size() + config.getInstanceIdsSize(),
-              ResourceAggregates.scale(
-                  QuotaUtil.prodResourcesFromTasks(ImmutableSet.of(task)),
-                  config.getInstanceIdsSize()));
+              quotaManager.checkInstanceAddition(task, config.getInstanceIdsSize()));
 
           stateManager.insertPendingTasks(task, ImmutableSet.copyOf(config.getInstanceIds()));
           return okEmptyResponse();
@@ -1291,7 +1288,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
   private void validateTaskLimits(
       ITaskConfig task,
       int totalInstances,
-      IResourceAggregate requestedProdResources) throws TaskValidationException {
+      QuotaCheckResult quotaCheck) throws TaskValidationException {
 
     if (totalInstances <= 0 || totalInstances > MAX_TASKS_PER_JOB.get()) {
       throw new TaskValidationException(String.format(
@@ -1306,9 +1303,6 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
           "Task ID is too long, please shorten your role or job name.");
     }
 
-    QuotaCheckResult quotaCheck =
-        quotaManager.checkQuota(task.getOwner().getRole(), requestedProdResources);
-
     if (quotaCheck.getResult() == INSUFFICIENT_QUOTA) {
       throw new TaskValidationException("Insufficient resource quota: "
           + quotaCheck.getDetails().or(""));
@@ -1450,7 +1444,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
           validateTaskLimits(
               request.getTaskConfig(),
               request.getInstanceCount(),
-              QuotaUtil.prodResourcesFromJobUpdate(update));
+              quotaManager.checkJobUpdate(update));
 
           jobUpdateController.start(update, context.getIdentity());
           return okResponse(Result.startJobUpdateResult(new StartJobUpdateResult(updateId)));

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/7bc791ce/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java
index b58c8f3..33790b1 100644
--- a/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java
@@ -86,7 +86,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
 
     expectQuota(quota);
     expectTasks(prodTask, nonProdTask);
-    expectJobUpdates(createTaskConfig(1, 1, 1, true), createTaskConfig(1, 1, 1, true));
+    expectJobUpdates(taskConfig(1, 1, 1, true), taskConfig(1, 1, 1, true));
 
     control.replay();
 
@@ -109,7 +109,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
 
     expectQuota(quota);
     expectTasks(prodTask, updatingProdTask, updatingFilteredProdTask, nonProdTask);
-    expectJobUpdates(createTaskConfig(1, 1, 1, true), createTaskConfig(1, 1, 1, true));
+    expectJobUpdates(taskConfig(1, 1, 1, true), taskConfig(1, 1, 1, true));
 
     control.replay();
 
@@ -145,11 +145,11 @@ public class QuotaManagerImplTest extends EasyMockTest {
   public void testCheckQuotaPasses() {
     expectQuota(IResourceAggregate.build(new ResourceAggregate(4, 4, 4)));
     expectTasks(createProdTask("foo", 2, 2, 2));
-    expectJobUpdates(createTaskConfig(1, 1, 1, true), createTaskConfig(1, 1, 1, true));
+    expectJobUpdates(taskConfig(1, 1, 1, true), taskConfig(1, 1, 1, true));
 
     control.replay();
 
-    QuotaCheckResult checkQuota = quotaManager.checkQuota(ROLE, prodResource(1, 1, 1));
+    QuotaCheckResult checkQuota = quotaManager.checkInstanceAddition(taskConfig(1, 1, 1, true), 1);
     assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
   }
 
@@ -157,11 +157,11 @@ public class QuotaManagerImplTest extends EasyMockTest {
   public void testCheckQuotaPassesNoTasks() {
     expectQuota(IResourceAggregate.build(new ResourceAggregate(4, 4, 4)));
     expectNoTasks();
-    expectJobUpdates(createTaskConfig(1, 1, 1, true), createTaskConfig(1, 1, 1, true));
+    expectJobUpdates(taskConfig(1, 1, 1, true), taskConfig(1, 1, 1, true));
 
     control.replay();
 
-    QuotaCheckResult checkQuota = quotaManager.checkQuota(ROLE, prodResource(1, 1, 1));
+    QuotaCheckResult checkQuota = quotaManager.checkInstanceAddition(taskConfig(1, 1, 1, true), 1);
     assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
   }
 
@@ -173,7 +173,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
 
     control.replay();
 
-    QuotaCheckResult checkQuota = quotaManager.checkQuota(ROLE, prodResource(1, 1, 1));
+    QuotaCheckResult checkQuota = quotaManager.checkInstanceAddition(taskConfig(1, 1, 1, true), 1);
     assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
   }
 
@@ -185,7 +185,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
 
     control.replay();
 
-    QuotaCheckResult checkQuota = quotaManager.checkQuota(ROLE, prodResource(1, 1, 1));
+    QuotaCheckResult checkQuota = quotaManager.checkInstanceAddition(taskConfig(1, 1, 1, true), 1);
     assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
   }
 
@@ -198,7 +198,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
 
     control.replay();
 
-    QuotaCheckResult checkQuota = quotaManager.checkQuota(ROLE, prodResource(1, 1, 1));
+    QuotaCheckResult checkQuota = quotaManager.checkInstanceAddition(taskConfig(1, 1, 1, true), 1);
     assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
   }
 
@@ -206,7 +206,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
   public void testCheckQuotaSkippedForNonProdRequest() {
     control.replay();
 
-    QuotaCheckResult checkQuota = quotaManager.checkQuota(ROLE, ResourceAggregates.EMPTY);
+    QuotaCheckResult checkQuota = quotaManager.checkInstanceAddition(taskConfig(1, 1, 1, false), 1);
     assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
   }
 
@@ -220,7 +220,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
 
     control.replay();
 
-    QuotaCheckResult checkQuota = quotaManager.checkQuota(ROLE, prodResource(1, 1, 1));
+    QuotaCheckResult checkQuota = quotaManager.checkInstanceAddition(taskConfig(1, 1, 1, true), 1);
     assertEquals(INSUFFICIENT_QUOTA, checkQuota.getResult());
   }
 
@@ -232,7 +232,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
 
     control.replay();
 
-    QuotaCheckResult checkQuota = quotaManager.checkQuota(ROLE, prodResource(2, 1, 1));
+    QuotaCheckResult checkQuota = quotaManager.checkInstanceAddition(taskConfig(2, 1, 1, true), 1);
     assertEquals(INSUFFICIENT_QUOTA, checkQuota.getResult());
     assertTrue(checkQuota.getDetails().get().contains("CPU"));
   }
@@ -245,7 +245,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
 
     control.replay();
 
-    QuotaCheckResult checkQuota = quotaManager.checkQuota(ROLE, prodResource(1, 2, 1));
+    QuotaCheckResult checkQuota = quotaManager.checkInstanceAddition(taskConfig(1, 2, 1, true), 1);
     assertEquals(INSUFFICIENT_QUOTA, checkQuota.getResult());
     assertTrue(checkQuota.getDetails().get().contains("RAM"));
   }
@@ -258,7 +258,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
 
     control.replay();
 
-    QuotaCheckResult checkQuota = quotaManager.checkQuota(ROLE, prodResource(1, 1, 2));
+    QuotaCheckResult checkQuota = quotaManager.checkInstanceAddition(taskConfig(1, 1, 2, true), 1);
     assertEquals(INSUFFICIENT_QUOTA, checkQuota.getResult());
     assertTrue(checkQuota.getDetails().get().contains("DISK"));
   }
@@ -269,11 +269,11 @@ public class QuotaManagerImplTest extends EasyMockTest {
     expectTasks(createProdTask("foo", 2, 2, 2), createTask(JOB_NAME, "id2", 3, 3, 3, true, 0))
         .times(2);
 
-    expectJobUpdates(createTaskConfig(1, 1, 1, true), createTaskConfig(2, 2, 2, true), 2);
+    expectJobUpdates(taskConfig(1, 1, 1, true), taskConfig(2, 2, 2, true), 2);
 
     control.replay();
 
-    QuotaCheckResult checkQuota = quotaManager.checkQuota(ROLE, prodResource(1, 1, 1));
+    QuotaCheckResult checkQuota = quotaManager.checkInstanceAddition(taskConfig(1, 1, 1, true), 1);
     assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
     assertEquals(
         IResourceAggregate.build(new ResourceAggregate(4, 4, 4)),
@@ -285,11 +285,11 @@ public class QuotaManagerImplTest extends EasyMockTest {
     expectQuota(IResourceAggregate.build(new ResourceAggregate(5, 5, 5))).times(2);
     expectTasks(createProdTask("foo", 2, 2, 2), createProdTask("bar", 2, 2, 2)).times(2);
 
-    expectJobUpdates(createTaskConfig(8, 8, 8, false), createTaskConfig(4, 4, 4, false), 2);
+    expectJobUpdates(taskConfig(8, 8, 8, false), taskConfig(4, 4, 4, false), 2);
 
     control.replay();
 
-    QuotaCheckResult checkQuota = quotaManager.checkQuota(ROLE, prodResource(1, 1, 1));
+    QuotaCheckResult checkQuota = quotaManager.checkInstanceAddition(taskConfig(1, 1, 1, true), 1);
     assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
     assertEquals(
         IResourceAggregate.build(new ResourceAggregate(4, 4, 4)),
@@ -301,11 +301,11 @@ public class QuotaManagerImplTest extends EasyMockTest {
     expectQuota(IResourceAggregate.build(new ResourceAggregate(5, 5, 5))).times(2);
     expectTasks(createProdTask("foo", 2, 2, 2), createProdTask("bar", 1, 1, 1)).times(2);
 
-    expectJobUpdates(createTaskConfig(1, 1, 1, true), createTaskConfig(4, 4, 4, false), 2);
+    expectJobUpdates(taskConfig(1, 1, 1, true), taskConfig(4, 4, 4, false), 2);
 
     control.replay();
 
-    QuotaCheckResult checkQuota = quotaManager.checkQuota(ROLE, prodResource(1, 1, 1));
+    QuotaCheckResult checkQuota = quotaManager.checkInstanceAddition(taskConfig(1, 1, 1, true), 1);
     assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
     assertEquals(
         IResourceAggregate.build(new ResourceAggregate(4, 4, 4)),
@@ -317,11 +317,11 @@ public class QuotaManagerImplTest extends EasyMockTest {
     expectQuota(IResourceAggregate.build(new ResourceAggregate(5, 5, 5))).times(2);
     expectTasks(createProdTask("foo", 2, 2, 2), createProdTask("bar", 2, 2, 2)).times(2);
 
-    expectJobUpdates(createTaskConfig(1, 1, 1, false), createTaskConfig(1, 1, 1, true), 2);
+    expectJobUpdates(taskConfig(1, 1, 1, false), taskConfig(1, 1, 1, true), 2);
 
     control.replay();
 
-    QuotaCheckResult checkQuota = quotaManager.checkQuota(ROLE, prodResource(1, 1, 1));
+    QuotaCheckResult checkQuota = quotaManager.checkInstanceAddition(taskConfig(1, 1, 1, true), 1);
     assertEquals(INSUFFICIENT_QUOTA, checkQuota.getResult());
     assertEquals(
         IResourceAggregate.build(new ResourceAggregate(5, 5, 5)),
@@ -332,11 +332,11 @@ public class QuotaManagerImplTest extends EasyMockTest {
   public void testCheckQuotaOldJobUpdateConfigMatters() {
     expectQuota(IResourceAggregate.build(new ResourceAggregate(6, 6, 6))).times(2);
     expectTasks(createProdTask("foo", 2, 2, 2), createProdTask("bar", 2, 2, 2)).times(2);
-    expectJobUpdates(createTaskConfig(2, 2, 2, true), createTaskConfig(1, 1, 1, true), 2);
+    expectJobUpdates(taskConfig(2, 2, 2, true), taskConfig(1, 1, 1, true), 2);
 
     control.replay();
 
-    QuotaCheckResult checkQuota = quotaManager.checkQuota(ROLE, prodResource(1, 1, 1));
+    QuotaCheckResult checkQuota = quotaManager.checkInstanceAddition(taskConfig(1, 1, 1, true), 1);
     assertEquals(INSUFFICIENT_QUOTA, checkQuota.getResult());
     assertEquals(
         IResourceAggregate.build(new ResourceAggregate(6, 6, 6)),
@@ -347,11 +347,11 @@ public class QuotaManagerImplTest extends EasyMockTest {
   public void testCheckQuotaUpdateAddsInstances() {
     expectQuota(IResourceAggregate.build(new ResourceAggregate(6, 6, 6))).times(2);
     expectTasks(createProdTask("foo", 2, 2, 2), createProdTask("bar", 2, 2, 2)).times(2);
-    expectJobUpdates(createTaskConfig(1, 1, 1, true), 1, createTaskConfig(1, 1, 1, true), 2, 2);
+    expectJobUpdates(taskConfig(1, 1, 1, true), 1, taskConfig(1, 1, 1, true), 2, 2);
 
     control.replay();
 
-    QuotaCheckResult checkQuota = quotaManager.checkQuota(ROLE, prodResource(1, 1, 1));
+    QuotaCheckResult checkQuota = quotaManager.checkInstanceAddition(taskConfig(1, 1, 1, true), 1);
     assertEquals(INSUFFICIENT_QUOTA, checkQuota.getResult());
     assertEquals(
         IResourceAggregate.build(new ResourceAggregate(6, 6, 6)),
@@ -362,11 +362,11 @@ public class QuotaManagerImplTest extends EasyMockTest {
   public void testCheckQuotaUpdateRemovesInstances() {
     expectQuota(IResourceAggregate.build(new ResourceAggregate(6, 6, 6))).times(2);
     expectTasks(createProdTask("foo", 2, 2, 2), createProdTask("bar", 2, 2, 2)).times(2);
-    expectJobUpdates(createTaskConfig(1, 1, 1, true), 2, createTaskConfig(1, 1, 1, true), 1, 2);
+    expectJobUpdates(taskConfig(1, 1, 1, true), 2, taskConfig(1, 1, 1, true), 1, 2);
 
     control.replay();
 
-    QuotaCheckResult checkQuota = quotaManager.checkQuota(ROLE, prodResource(1, 1, 1));
+    QuotaCheckResult checkQuota = quotaManager.checkInstanceAddition(taskConfig(1, 1, 1, true), 1);
     assertEquals(INSUFFICIENT_QUOTA, checkQuota.getResult());
     assertEquals(
         IResourceAggregate.build(new ResourceAggregate(6, 6, 6)),
@@ -379,7 +379,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
     expectTasks(createProdTask("foo", 2, 2, 2), createProdTask(JOB_NAME, 2, 2, 2)).times(2);
 
     String updateId = "u1";
-    ITaskConfig config = createTaskConfig(2, 2, 2, true);
+    ITaskConfig config = taskConfig(2, 2, 2, true);
     List<IJobUpdateSummary> summaries = buildJobUpdateSummaries(updateId, JobKeys.from(config));
     IJobUpdate update = buildJobUpdate(summaries.get(0), config, 1, config, 1);
     JobUpdate builder = update.newBuilder();
@@ -393,7 +393,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
 
     control.replay();
 
-    QuotaCheckResult checkQuota = quotaManager.checkQuota(ROLE, prodResource(1, 1, 1));
+    QuotaCheckResult checkQuota = quotaManager.checkInstanceAddition(taskConfig(1, 1, 1, true), 1);
     assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
     assertEquals(
         IResourceAggregate.build(new ResourceAggregate(4, 4, 4)),
@@ -406,7 +406,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
     expectTasks(createProdTask("foo", 2, 2, 2), createProdTask(JOB_NAME, 2, 2, 2)).times(2);
 
     String updateId = "u1";
-    ITaskConfig config = createTaskConfig(2, 2, 2, true);
+    ITaskConfig config = taskConfig(2, 2, 2, true);
     List<IJobUpdateSummary> summaries = buildJobUpdateSummaries(updateId, JobKeys.from(config));
     IJobUpdate update = buildJobUpdate(summaries.get(0), config, 1, config, 1);
     JobUpdate builder = update.newBuilder();
@@ -420,7 +420,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
 
     control.replay();
 
-    QuotaCheckResult checkQuota = quotaManager.checkQuota(ROLE, prodResource(1, 1, 1));
+    QuotaCheckResult checkQuota = quotaManager.checkInstanceAddition(taskConfig(1, 1, 1, true), 1);
     assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
     assertEquals(
         IResourceAggregate.build(new ResourceAggregate(4, 4, 4)),
@@ -433,7 +433,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
     expectTasks(createProdTask("foo", 2, 2, 2), createProdTask("bar", 2, 2, 2)).times(2);
 
     String updateId = "u1";
-    ITaskConfig config = createTaskConfig(2, 2, 2, true);
+    ITaskConfig config = taskConfig(2, 2, 2, true);
     List<IJobUpdateSummary> summaries = buildJobUpdateSummaries(updateId, JobKeys.from(config));
     IJobUpdate update = buildJobUpdate(summaries.get(0), config, 1, config, 1);
     JobUpdate builder = update.newBuilder();
@@ -447,7 +447,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
 
     control.replay();
 
-    QuotaCheckResult checkQuota = quotaManager.checkQuota(ROLE, prodResource(1, 1, 1));
+    QuotaCheckResult checkQuota = quotaManager.checkInstanceAddition(taskConfig(1, 1, 1, true), 1);
     assertEquals(INSUFFICIENT_QUOTA, checkQuota.getResult());
     assertEquals(
         IResourceAggregate.build(new ResourceAggregate(6, 6, 6)),
@@ -455,6 +455,115 @@ public class QuotaManagerImplTest extends EasyMockTest {
   }
 
   @Test
+  public void testCheckQuotaNewInPlaceUpdate() {
+    expectQuota(IResourceAggregate.build(new ResourceAggregate(6, 6, 6))).times(2);
+    expectTasks(
+        createProdTask("foo", 2, 2, 2),
+        createTask(JOB_NAME, "id1", 2, 2, 2, true, 0),
+        createTask(JOB_NAME, "id12", 2, 2, 2, true, 12)).times(2);
+    expectNoJobUpdates().times(2);
+
+    ITaskConfig config = taskConfig(1, 1, 1, true);
+    IJobUpdate update = buildJobUpdate(
+        buildJobUpdateSummaries("u1", JobKeys.from(config)).get(0),
+        taskConfig(2, 2, 2, true),
+        1,
+        config,
+        1);
+
+    control.replay();
+
+    QuotaCheckResult checkQuota = quotaManager.checkJobUpdate(update);
+    assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
+    assertEquals(
+        IResourceAggregate.build(new ResourceAggregate(6, 6, 6)),
+        quotaManager.getQuotaInfo(ROLE).getProdConsumption());
+  }
+
+  @Test
+  public void testCheckQuotaNewUpdateAddsInstances() {
+    expectQuota(IResourceAggregate.build(new ResourceAggregate(6, 6, 6))).times(2);
+    expectTasks(createProdTask("foo", 2, 2, 2), createProdTask(JOB_NAME, 2, 2, 2)).times(2);
+    expectNoJobUpdates().times(2);
+
+    ITaskConfig config = taskConfig(2, 2, 2, true);
+    IJobUpdate update = buildJobUpdate(
+        buildJobUpdateSummaries("u1", JobKeys.from(config)).get(0),
+        config,
+        1,
+        config,
+        3);
+
+    control.replay();
+
+    QuotaCheckResult checkQuota = quotaManager.checkJobUpdate(update);
+    assertEquals(INSUFFICIENT_QUOTA, checkQuota.getResult());
+    assertEquals(
+        IResourceAggregate.build(new ResourceAggregate(4, 4, 4)),
+        quotaManager.getQuotaInfo(ROLE).getProdConsumption());
+  }
+
+  @Test
+  public void testCheckQuotaNewUpdateRemovesInstances() {
+    expectQuota(IResourceAggregate.build(new ResourceAggregate(6, 6, 6))).times(2);
+    expectTasks(
+        createProdTask("foo", 2, 2, 2),
+        createTask(JOB_NAME, "id1", 2, 2, 2, true, 0),
+        createTask(JOB_NAME, "id2", 2, 2, 2, true, 1)).times(2);
+    expectNoJobUpdates().times(2);
+
+    ITaskConfig config = taskConfig(2, 2, 2, true);
+    IJobUpdate update = buildJobUpdate(
+        buildJobUpdateSummaries("u1", JobKeys.from(config)).get(0),
+        config,
+        1,
+        config,
+        1);
+
+    control.replay();
+
+    QuotaCheckResult checkQuota = quotaManager.checkJobUpdate(update);
+    assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
+    assertEquals(
+        IResourceAggregate.build(new ResourceAggregate(6, 6, 6)),
+        quotaManager.getQuotaInfo(ROLE).getProdConsumption());
+  }
+
+  @Test
+  public void testCheckQuotaNewUpdateSkippedForNonProdDesiredState() {
+    ITaskConfig config = taskConfig(2, 2, 2, false);
+    IJobUpdate update = buildJobUpdate(
+        buildJobUpdateSummaries("u1", JobKeys.from(config)).get(0),
+        taskConfig(2, 2, 2, true),
+        1,
+        config,
+        1);
+
+    control.replay();
+
+    QuotaCheckResult checkQuota = quotaManager.checkJobUpdate(update);
+    assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
+  }
+
+  @Test
+  public void testCheckQuotaNewUpdateSkippedForEmptyDesiredState() {
+    ITaskConfig config = taskConfig(2, 2, 2, true);
+    IJobUpdate update = buildJobUpdate(
+        buildJobUpdateSummaries("u1", JobKeys.from(config)).get(0),
+        config,
+        1,
+        config,
+        1);
+    JobUpdate updateBuilder = update.newBuilder();
+    updateBuilder.getInstructions().unsetDesiredState();
+
+    control.replay();
+
+    QuotaCheckResult checkQuota = quotaManager.checkJobUpdate(IJobUpdate.build(updateBuilder));
+    assertEquals(SUFFICIENT_QUOTA, checkQuota.getResult());
+  }
+
+  @Test
   public void testSaveQuotaPasses() throws Exception {
     storageUtil.quotaStore.saveQuota(ROLE, QUOTA);
 
@@ -530,8 +639,8 @@ public class QuotaManagerImplTest extends EasyMockTest {
                 .setInstances(ImmutableSet.of(new Range(0, intialInstances - 1)))))));
   }
 
-  private void expectNoJobUpdates() {
-    expect(jobUpdateStore.fetchJobUpdateSummaries(
+  private IExpectationSetters<?> expectNoJobUpdates() {
+    return expect(jobUpdateStore.fetchJobUpdateSummaries(
         QuotaManagerImpl.updateQuery(ROLE))).andReturn(ImmutableList.<IJobUpdateSummary>of());
   }
 
@@ -544,11 +653,7 @@ public class QuotaManagerImplTest extends EasyMockTest {
         .andReturn(Optional.of(quota));
   }
 
-  private IResourceAggregate prodResource(double cpu, long ram, long disk) {
-    return IResourceAggregate.build(new ResourceAggregate(cpu, ram, disk));
-  }
-
-  private ITaskConfig createTaskConfig(int cpus, int ramMb, int diskMb, boolean production) {
+  private ITaskConfig taskConfig(int cpus, int ramMb, int diskMb, boolean production) {
     return createTask(JOB_NAME, "newId", cpus, ramMb, diskMb, production, 0)
         .getAssignedTask()
         .getTask();

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/7bc791ce/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
index 02cd8f7..c3f0bbe 100644
--- a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
@@ -167,6 +167,7 @@ import static org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.MAX_TA
 import static org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.killedByMessage;
 import static org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.restartedByMessage;
 import static org.apache.aurora.scheduler.thrift.SchedulerThriftInterface.transitionMessage;
+import static org.easymock.EasyMock.anyInt;
 import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expect;
@@ -198,9 +199,6 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
   private static final IResourceAggregate CONSUMED =
       IResourceAggregate.build(new ResourceAggregate(0.0, 0, 0));
 
-  private static final IResourceAggregate DEFAULT_CONSUMED =
-      IResourceAggregate.build(new ResourceAggregate(1, 1024, 1024));
-
   private static final QuotaCheckResult ENOUGH_QUOTA = new QuotaCheckResult(SUFFICIENT_QUOTA);
   private static final QuotaCheckResult NOT_ENOUGH_QUOTA = new QuotaCheckResult(INSUFFICIENT_QUOTA);
 
@@ -341,7 +339,8 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
     expect(cronJobManager.hasJob(JOB_KEY)).andReturn(false);
     expect(taskIdGenerator.generate(sanitized.getJobConfig().getTaskConfig(), 1))
         .andReturn(TASK_ID);
-    expect(quotaManager.checkQuota(ROLE, DEFAULT_CONSUMED)).andReturn(ENOUGH_QUOTA);
+    expect(quotaManager.checkInstanceAddition(sanitized.getJobConfig().getTaskConfig(), 1))
+        .andReturn(ENOUGH_QUOTA);
 
     stateManager.insertPendingTasks(
         sanitized.getJobConfig().getTaskConfig(),
@@ -362,7 +361,8 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
     expect(cronJobManager.hasJob(JOB_KEY)).andReturn(false);
     expect(taskIdGenerator.generate(sanitized.getJobConfig().getTaskConfig(), 1))
         .andReturn(TASK_ID);
-    expect(quotaManager.checkQuota(ROLE, DEFAULT_CONSUMED)).andReturn(ENOUGH_QUOTA);
+    expect(quotaManager.checkInstanceAddition(sanitized.getJobConfig().getTaskConfig(), 1))
+        .andReturn(ENOUGH_QUOTA);
 
     stateManager.insertPendingTasks(
         sanitized.getJobConfig().getTaskConfig(),
@@ -383,7 +383,8 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
     expect(cronJobManager.hasJob(JOB_KEY)).andReturn(false);
     expect(taskIdGenerator.generate(sanitized.getJobConfig().getTaskConfig(), 1))
         .andReturn(TASK_ID);
-    expect(quotaManager.checkQuota(ROLE, DEFAULT_CONSUMED)).andReturn(ENOUGH_QUOTA);
+    expect(quotaManager.checkInstanceAddition(sanitized.getJobConfig().getTaskConfig(), 1))
+        .andReturn(ENOUGH_QUOTA);
 
     cronJobManager.createJob(anyObject(SanitizedCronJob.class));
 
@@ -403,7 +404,8 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
     expect(cronJobManager.hasJob(JOB_KEY)).andReturn(false);
     expect(taskIdGenerator.generate(sanitized.getJobConfig().getTaskConfig(), 1))
         .andReturn(TASK_ID);
-    expect(quotaManager.checkQuota(ROLE, DEFAULT_CONSUMED)).andReturn(ENOUGH_QUOTA);
+    expect(quotaManager.checkInstanceAddition(sanitized.getJobConfig().getTaskConfig(), 1))
+        .andReturn(ENOUGH_QUOTA);
 
     stateManager.insertPendingTasks(
         sanitized.getJobConfig().getTaskConfig(),
@@ -482,6 +484,8 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
     lockManager.validateIfLocked(LOCK_KEY, Optional.<ILock>absent());
     storageUtil.expectTaskFetch(Query.jobScoped(JOB_KEY).active());
     expect(cronJobManager.hasJob(JOB_KEY)).andReturn(false);
+    expect(quotaManager.checkInstanceAddition(anyObject(ITaskConfig.class), anyInt()))
+        .andReturn(ENOUGH_QUOTA);
 
     control.replay();
 
@@ -496,6 +500,8 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
     lockManager.validateIfLocked(LOCK_KEY, Optional.<ILock>absent());
     storageUtil.expectTaskFetch(Query.jobScoped(JOB_KEY).active());
     expect(cronJobManager.hasJob(JOB_KEY)).andReturn(false);
+    expect(quotaManager.checkInstanceAddition(anyObject(ITaskConfig.class), anyInt()))
+        .andReturn(ENOUGH_QUOTA);
     expect(taskIdGenerator.generate(sanitized.getJobConfig().getTaskConfig(), 1))
         .andReturn(Strings.repeat("a", MAX_TASK_ID_LENGTH + 1));
 
@@ -514,7 +520,8 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
     expect(cronJobManager.hasJob(JOB_KEY)).andReturn(false);
     expect(taskIdGenerator.generate(sanitized.getJobConfig().getTaskConfig(), 1))
         .andReturn(TASK_ID);
-    expect(quotaManager.checkQuota(ROLE, DEFAULT_CONSUMED)).andReturn(NOT_ENOUGH_QUOTA);
+    expect(quotaManager.checkInstanceAddition(sanitized.getJobConfig().getTaskConfig(), 1))
+        .andReturn(NOT_ENOUGH_QUOTA);
 
     control.replay();
 
@@ -651,7 +658,8 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
     expect(cronJobManager.hasJob(JOB_KEY)).andReturn(false);
     expect(taskIdGenerator.generate(ITaskConfig.build(sanitized.getTaskConfig()), 1))
         .andReturn(TASK_ID);
-    expect(quotaManager.checkQuota(ROLE, DEFAULT_CONSUMED)).andReturn(ENOUGH_QUOTA);
+    expect(quotaManager.checkInstanceAddition(ITaskConfig.build(sanitized.getTaskConfig()), 1))
+        .andReturn(ENOUGH_QUOTA);
     stateManager.insertPendingTasks(
         ITaskConfig.build(sanitized.getTaskConfig()),
         ImmutableSet.of(0));
@@ -1799,7 +1807,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
     storageUtil.expectTaskFetch(Query.jobScoped(JOB_KEY).active());
     expect(taskIdGenerator.generate(populatedTask, 1))
         .andReturn(TASK_ID);
-    expect(quotaManager.checkQuota(ROLE, DEFAULT_CONSUMED)).andReturn(ENOUGH_QUOTA);
+    expect(quotaManager.checkInstanceAddition(populatedTask, 1)).andReturn(ENOUGH_QUOTA);
     stateManager.insertPendingTasks(populatedTask, ImmutableSet.of(0));
 
     control.replay();
@@ -1817,7 +1825,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
     storageUtil.expectTaskFetch(Query.jobScoped(JOB_KEY).active());
     expect(taskIdGenerator.generate(populatedTask, 1))
         .andReturn(TASK_ID);
-    expect(quotaManager.checkQuota(ROLE, DEFAULT_CONSUMED)).andReturn(ENOUGH_QUOTA);
+    expect(quotaManager.checkInstanceAddition(populatedTask, 1)).andReturn(ENOUGH_QUOTA);
     stateManager.insertPendingTasks(populatedTask, ImmutableSet.of(0));
 
     control.replay();
@@ -1899,6 +1907,8 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
     expect(cronJobManager.hasJob(JOB_KEY)).andReturn(false);
     lockManager.validateIfLocked(LOCK_KEY, Optional.of(LOCK));
     storageUtil.expectTaskFetch(Query.jobScoped(JOB_KEY).active());
+    expect(quotaManager.checkInstanceAddition(anyObject(ITaskConfig.class), anyInt()))
+        .andReturn(ENOUGH_QUOTA);
     expect(taskIdGenerator.generate(populatedTask, 1))
         .andReturn(Strings.repeat("a", MAX_TASK_ID_LENGTH + 1));
 
@@ -1917,7 +1927,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
     storageUtil.expectTaskFetch(Query.jobScoped(JOB_KEY).active());
     expect(taskIdGenerator.generate(populatedTask, 1))
         .andReturn(TASK_ID);
-    expect(quotaManager.checkQuota(ROLE, DEFAULT_CONSUMED)).andReturn(NOT_ENOUGH_QUOTA);
+    expect(quotaManager.checkInstanceAddition(populatedTask, 1)).andReturn(NOT_ENOUGH_QUOTA);
 
     control.replay();
 
@@ -1934,7 +1944,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
     storageUtil.expectTaskFetch(Query.jobScoped(JOB_KEY).active());
     expect(taskIdGenerator.generate(populatedTask, 1))
         .andReturn(TASK_ID);
-    expect(quotaManager.checkQuota(ROLE, DEFAULT_CONSUMED)).andReturn(ENOUGH_QUOTA);
+    expect(quotaManager.checkInstanceAddition(populatedTask, 1)).andReturn(ENOUGH_QUOTA);
     stateManager.insertPendingTasks(populatedTask, ImmutableSet.of(0));
     expectLastCall().andThrow(new IllegalArgumentException("instance collision"));
 
@@ -2116,9 +2126,6 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
     ITaskConfig newTask = buildScheduledTask(0, 8).getAssignedTask().getTask();
     expect(taskIdGenerator.generate(newTask, 6))
         .andReturn(TASK_ID);
-    expect(quotaManager.checkQuota(
-        ROLE,
-        IResourceAggregate.build(new ResourceAggregate(7, 48, 7168)))).andReturn(ENOUGH_QUOTA);
 
     IScheduledTask oldTask1 = buildScheduledTask(0, 5);
     IScheduledTask oldTask2 = buildScheduledTask(1, 5);
@@ -2132,6 +2139,8 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
         oldTask1.getAssignedTask().getTask(), ImmutableSet.of(new Range(0, 1), new Range(4, 6)),
         oldTask3.getAssignedTask().getTask(), ImmutableSet.of(new Range(2, 3))
     ));
+
+    expect(quotaManager.checkJobUpdate(update)).andReturn(ENOUGH_QUOTA);
     expect(uuidGenerator.createNew()).andReturn(UU_ID);
     storageUtil.expectTaskFetch(
         Query.unscoped().byJob(JOB_KEY).active(),
@@ -2160,9 +2169,6 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
     ITaskConfig newTask = buildScheduledTask(0, 5).getAssignedTask().getTask();
     expect(taskIdGenerator.generate(newTask, 1))
         .andReturn(TASK_ID);
-    expect(quotaManager.checkQuota(
-        ROLE,
-        IResourceAggregate.build(new ResourceAggregate(1, 5, 1024)))).andReturn(ENOUGH_QUOTA);
 
     IScheduledTask oldTask1 = buildScheduledTask(0, 5);
     IScheduledTask oldTask2 = buildScheduledTask(1, 5);
@@ -2171,6 +2177,8 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
     IJobUpdate update = buildJobUpdate(1, newTask, ImmutableMap.of(
         oldTask1.getAssignedTask().getTask(), ImmutableSet.of(new Range(0, 1))));
 
+    expect(quotaManager.checkJobUpdate(anyObject(IJobUpdate.class))).andReturn(ENOUGH_QUOTA);
+
     // Set diff-adjusted IJobUpdate expectations.
     JobUpdate expected = update.newBuilder();
     expected.getInstructions().setInitialState(ImmutableSet.of(
@@ -2343,6 +2351,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
     expect(cronJobManager.hasJob(JOB_KEY)).andReturn(false);
     expect(uuidGenerator.createNew()).andReturn(UU_ID);
     storageUtil.expectTaskFetch(Query.unscoped().byJob(JOB_KEY).active());
+    expect(quotaManager.checkJobUpdate(anyObject(IJobUpdate.class))).andReturn(ENOUGH_QUOTA);
 
     control.replay();
 
@@ -2356,6 +2365,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
     expect(cronJobManager.hasJob(JOB_KEY)).andReturn(false);
     expect(uuidGenerator.createNew()).andReturn(UU_ID);
     storageUtil.expectTaskFetch(Query.unscoped().byJob(JOB_KEY).active());
+    expect(quotaManager.checkJobUpdate(anyObject(IJobUpdate.class))).andReturn(ENOUGH_QUOTA);
     expect(taskIdGenerator.generate(ITaskConfig.build(request.getTaskConfig()), 6))
         .andReturn(Strings.repeat("a", MAX_TASK_ID_LENGTH + 1));
 
@@ -2370,11 +2380,17 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
     expectAuth(ROLE, true);
     expect(cronJobManager.hasJob(JOB_KEY)).andReturn(false);
     expect(uuidGenerator.createNew()).andReturn(UU_ID);
-    storageUtil.expectTaskFetch(Query.unscoped().byJob(JOB_KEY).active());
+
+    IScheduledTask oldTask = buildScheduledTask(0, 5);
+    storageUtil.expectTaskFetch(Query.unscoped().byJob(JOB_KEY).active(), oldTask);
     expect(taskIdGenerator.generate(ITaskConfig.build(request.getTaskConfig()), 6))
         .andReturn(TASK_ID);
-    expect(quotaManager.checkQuota(ROLE, IResourceAggregate.build(
-        new ResourceAggregate(6, 6144, 6144)))).andReturn(NOT_ENOUGH_QUOTA);
+
+    ITaskConfig config = ITaskConfig.build(request.getTaskConfig());
+    IJobUpdate update = buildJobUpdate(6, config, ImmutableMap.of(
+        oldTask.getAssignedTask().getTask(), ImmutableSet.of(new Range(0, 0))));
+
+    expect(quotaManager.checkJobUpdate(update)).andReturn(NOT_ENOUGH_QUOTA);
 
     control.replay();
 
@@ -2396,8 +2412,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
     expect(uuidGenerator.createNew()).andReturn(UU_ID);
     expect(taskIdGenerator.generate(ITaskConfig.build(populatedTask()), 1))
         .andReturn(TASK_ID);
-    expect(quotaManager.checkQuota(ROLE, IResourceAggregate.build(
-        new ResourceAggregate(1, 1024, 1024)))).andReturn(ENOUGH_QUOTA);
+    expect(quotaManager.checkJobUpdate(update)).andReturn(ENOUGH_QUOTA);
     storageUtil.expectTaskFetch(Query.unscoped().byJob(JOB_KEY).active(), oldTask);
     jobUpdateController.start(update, USER);
     expectLastCall().andThrow(new UpdateStateException("failed"));