You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2015/04/01 19:24:34 UTC
aurora git commit: Refine types used in QuotaManager,
share more functions/predicates.
Repository: aurora
Updated Branches:
refs/heads/master 14e7b84f4 -> 4b9c759cf
Refine types used in QuotaManager, share more functions/predicates.
Reviewed at https://reviews.apache.org/r/32371/
Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/4b9c759c
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/4b9c759c
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/4b9c759c
Branch: refs/heads/master
Commit: 4b9c759cf3868b1b89e5411fd7ed782d2e5f81e0
Parents: 14e7b84
Author: Bill Farner <wf...@apache.org>
Authored: Wed Apr 1 10:22:18 2015 -0700
Committer: Bill Farner <wf...@apache.org>
Committed: Wed Apr 1 10:22:18 2015 -0700
----------------------------------------------------------------------
.../aurora/scheduler/quota/QuotaManager.java | 306 ++++++++++---------
.../aurora/scheduler/updater/UpdateFactory.java | 10 +-
.../aurora/scheduler/updater/Updates.java | 22 ++
3 files changed, 183 insertions(+), 155 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aurora/blob/4b9c759c/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java b/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
index 39e930c..7453680 100644
--- a/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
@@ -13,7 +13,7 @@
*/
package org.apache.aurora.scheduler.quota;
-import java.util.List;
+import java.util.Arrays;
import java.util.Map;
import java.util.Set;
@@ -24,13 +24,11 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableRangeSet;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
-import com.google.common.collect.Range;
import com.google.common.collect.RangeSet;
-import com.google.common.collect.Sets;
import org.apache.aurora.gen.JobUpdateQuery;
import org.apache.aurora.gen.ResourceAggregate;
@@ -40,6 +38,7 @@ import org.apache.aurora.scheduler.base.ResourceAggregates;
import org.apache.aurora.scheduler.storage.JobUpdateStore;
import org.apache.aurora.scheduler.storage.QuotaStore;
import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
import org.apache.aurora.scheduler.storage.entities.IInstanceTaskConfig;
import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
@@ -49,17 +48,19 @@ import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary;
import org.apache.aurora.scheduler.storage.entities.IRange;
import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
import org.apache.aurora.scheduler.updater.Updates;
import static java.util.Objects.requireNonNull;
import static org.apache.aurora.scheduler.base.ResourceAggregates.EMPTY;
+import static org.apache.aurora.scheduler.base.Tasks.ASSIGNED_TO_INFO;
+import static org.apache.aurora.scheduler.base.Tasks.ASSIGNED_TO_JOB_KEY;
import static org.apache.aurora.scheduler.base.Tasks.INFO_TO_JOB_KEY;
import static org.apache.aurora.scheduler.base.Tasks.IS_PRODUCTION;
-import static org.apache.aurora.scheduler.base.Tasks.SCHEDULED_TO_INFO;
+import static org.apache.aurora.scheduler.base.Tasks.SCHEDULED_TO_ASSIGNED;
import static org.apache.aurora.scheduler.quota.QuotaCheckResult.Result.SUFFICIENT_QUOTA;
+import static org.apache.aurora.scheduler.updater.Updates.getInstanceIds;
/**
* Allows access to resource quotas, and tracks quota consumption.
@@ -81,7 +82,7 @@ public interface QuotaManager {
*
* @param role Quota owner.
* @param storeProvider A store provider to access quota data.
- * @return {@code QuotaInfo} instance.
+ * @return quota usage information for the given role.
*/
QuotaInfo getQuotaInfo(String role, StoreProvider storeProvider);
@@ -93,7 +94,7 @@ public interface QuotaManager {
* @param template Task resource requirement.
* @param instances Number of additional instances requested.
* @param storeProvider A store provider to access quota data.
- * @return {@code QuotaComparisonResult} instance with quota check result details.
+ * @return quota check result details.
*/
QuotaCheckResult checkInstanceAddition(
ITaskConfig template,
@@ -106,7 +107,7 @@ public interface QuotaManager {
*
* @param jobUpdate Job update to check quota for.
* @param storeProvider A store provider to access quota data.
- * @return {@code QuotaComparisonResult} instance with quota check result details.
+ * @return quota check result details.
*/
QuotaCheckResult checkJobUpdate(IJobUpdate jobUpdate, StoreProvider storeProvider);
@@ -116,7 +117,7 @@ public interface QuotaManager {
*
* @param cronConfig Cron job configuration.
* @param storeProvider A store provider to access quota data.
- * @return{@code QuotaComparisonResult} instance with quota check result details.
+ * @return quota check result details.
*/
QuotaCheckResult checkCronUpdate(IJobConfiguration cronConfig, StoreProvider storeProvider);
@@ -233,18 +234,20 @@ public interface QuotaManager {
Optional<IJobUpdate> requestedUpdate,
StoreProvider storeProvider) {
- FluentIterable<IScheduledTask> tasks = FluentIterable.from(
- storeProvider.getTaskStore().fetchTasks(Query.roleScoped(role).active()));
+ FluentIterable<IAssignedTask> tasks = FluentIterable
+ .from(storeProvider.getTaskStore().fetchTasks(Query.roleScoped(role).active()))
+ .transform(SCHEDULED_TO_ASSIGNED);
- Map<IJobKey, IJobUpdate> updates = Maps.newHashMap(
- fetchActiveJobUpdates(storeProvider.getJobUpdateStore(), role)
- .uniqueIndex(UPDATE_TO_JOB_KEY));
+ Map<IJobKey, IJobUpdateInstructions> updates = Maps.newHashMap(
+ fetchActiveJobUpdates(storeProvider.getJobUpdateStore(), role));
// Mix in a requested job update (if present) to correctly calculate consumption.
// This would be an update that is not saved in the store yet (i.e. the one quota is
// checked for).
if (requestedUpdate.isPresent()) {
- updates.put(requestedUpdate.get().getSummary().getKey().getJob(), requestedUpdate.get());
+ updates.put(
+ requestedUpdate.get().getSummary().getKey().getJob(),
+ requestedUpdate.get().getInstructions());
}
Map<IJobKey, IJobConfiguration> cronTemplates =
@@ -262,32 +265,47 @@ public interface QuotaManager {
return new QuotaInfo(quota, prodConsumed, nonProdConsumed);
}
+ private static final Function<IJobConfiguration, ITaskConfig> JOB_TO_TASK =
+ new Function<IJobConfiguration, ITaskConfig>() {
+ @Override
+ public ITaskConfig apply(IJobConfiguration job) {
+ return job.getTaskConfig();
+ }
+ };
+
private IResourceAggregate getConsumption(
- FluentIterable<IScheduledTask> tasks,
- Map<IJobKey, IJobUpdate> updatesByKey,
+ FluentIterable<IAssignedTask> tasks,
+ Map<IJobKey, IJobUpdateInstructions> updatesByKey,
Map<IJobKey, IJobConfiguration> cronTemplatesByKey,
boolean isProd) {
Predicate<ITaskConfig> prodFilter = isProd ? IS_PRODUCTION : Predicates.not(IS_PRODUCTION);
- FluentIterable<IScheduledTask> filteredTasks =
- tasks.filter(Predicates.compose(prodFilter, SCHEDULED_TO_INFO));
+ FluentIterable<IAssignedTask> filteredTasks =
+ tasks.filter(Predicates.compose(prodFilter, ASSIGNED_TO_INFO));
+
+ Predicate<IAssignedTask> excludeCron = Predicates.compose(
+ Predicates.not(Predicates.in(cronTemplatesByKey.keySet())),
+ ASSIGNED_TO_JOB_KEY);
IResourceAggregate nonCronConsumption = getNonCronConsumption(
updatesByKey,
- excludeCronTasks(filteredTasks, cronTemplatesByKey),
- isProd);
+ filteredTasks.filter(excludeCron),
+ prodFilter);
- IResourceAggregate cronConsumption =
- getCronConsumption(cronTemplatesByKey, filteredTasks, isProd);
+ IResourceAggregate cronConsumption = getCronConsumption(
+ Iterables.filter(
+ cronTemplatesByKey.values(),
+ Predicates.compose(prodFilter, JOB_TO_TASK)),
+ filteredTasks.transform(ASSIGNED_TO_INFO));
return add(nonCronConsumption, cronConsumption);
}
private static IResourceAggregate getNonCronConsumption(
- Map<IJobKey, IJobUpdate> updatesByKey,
- FluentIterable<IScheduledTask> tasks,
- boolean isProd) {
+ Map<IJobKey, IJobUpdateInstructions> updatesByKey,
+ FluentIterable<IAssignedTask> tasks,
+ final Predicate<ITaskConfig> configFilter) {
// 1. Get all active tasks that belong to jobs without active updates OR unaffected by an
// active update working set. An example of the latter would be instances not updated by
@@ -302,21 +320,20 @@ public interface QuotaManager {
IResourceAggregate nonUpdateConsumption = fromTasks(tasks
.filter(buildNonUpdatingTasksFilter(updatesByKey))
- .transform(SCHEDULED_TO_INFO));
+ .transform(ASSIGNED_TO_INFO));
- IResourceAggregate updateConsumption = EMPTY;
- for (IJobUpdate update : updatesByKey.values()) {
- updateConsumption =
- add(updateConsumption, instructionsToResources(update.getInstructions(), isProd));
- }
+ final Predicate<IInstanceTaskConfig> instanceFilter =
+ Predicates.compose(configFilter, INSTANCE_CONFIG);
+
+ IResourceAggregate updateConsumption =
+ addAll(Iterables.transform(updatesByKey.values(), updateResources(instanceFilter)));
return add(nonUpdateConsumption, updateConsumption);
}
private static IResourceAggregate getCronConsumption(
- Map<IJobKey, IJobConfiguration> cronTemplates,
- FluentIterable<IScheduledTask> tasks,
- boolean isProd) {
+ Iterable<IJobConfiguration> cronTemplates,
+ FluentIterable<ITaskConfig> tasks) {
// Calculate the overall cron consumption as MAX between cron template resources and active
// cron tasks. This is required to account for a case when a running cron task has higher
@@ -326,52 +343,36 @@ public interface QuotaManager {
// cron scheduling, it's the simplest approach possible given the system constraints (e.g.:
// lack of enforcement on a cron job run duration).
- Multimap<IJobKey, ITaskConfig> taskConfigsByKey =
- tasks.transform(SCHEDULED_TO_INFO).index(INFO_TO_JOB_KEY);
-
- IResourceAggregate totalConsumption = EMPTY;
- for (IJobConfiguration config : cronTemplates.values()) {
- if (isProd == config.getTaskConfig().isProduction()) {
- IResourceAggregate templateConsumption =
- scale(config.getTaskConfig(), config.getInstanceCount());
-
- IResourceAggregate taskConsumption = fromTasks(taskConfigsByKey.get(config.getKey()));
-
- totalConsumption = add(totalConsumption, max(templateConsumption, taskConsumption));
- }
- }
- return totalConsumption;
- }
-
- private static FluentIterable<IScheduledTask> excludeCronTasks(
- FluentIterable<IScheduledTask> tasks,
- final Map<IJobKey, IJobConfiguration> cronJobs) {
-
- return tasks.filter(new Predicate<IScheduledTask>() {
- @Override
- public boolean apply(IScheduledTask input) {
- return !cronJobs.containsKey(input.getAssignedTask().getTask().getJob());
- }
- });
+ final Multimap<IJobKey, ITaskConfig> taskConfigsByKey = tasks.index(INFO_TO_JOB_KEY);
+ return addAll(Iterables.transform(
+ cronTemplates,
+ new Function<IJobConfiguration, IResourceAggregate>() {
+ @Override
+ public IResourceAggregate apply(IJobConfiguration config) {
+ return max(
+ scale(config.getTaskConfig(), config.getInstanceCount()),
+ fromTasks(taskConfigsByKey.get(config.getKey())));
+ }
+ }));
}
- private static Predicate<IScheduledTask> buildNonUpdatingTasksFilter(
- final Map<IJobKey, IJobUpdate> roleJobUpdates) {
+ private static Predicate<IAssignedTask> buildNonUpdatingTasksFilter(
+ final Map<IJobKey, IJobUpdateInstructions> roleJobUpdates) {
- return new Predicate<IScheduledTask>() {
+ return new Predicate<IAssignedTask>() {
@Override
- public boolean apply(IScheduledTask input) {
- Optional<IJobUpdate> update = Optional.fromNullable(
- roleJobUpdates.get(input.getAssignedTask().getTask().getJob()));
+ public boolean apply(IAssignedTask task) {
+ Optional<IJobUpdateInstructions> update = Optional.fromNullable(
+ roleJobUpdates.get(task.getTask().getJob()));
if (update.isPresent()) {
- IJobUpdateInstructions instructions = update.get().getInstructions();
- RangeSet<Integer> initialInstances = instanceRangeSet(instructions.getInitialState());
- RangeSet<Integer> desiredInstances = instanceRangeSet(instructions.isSetDesiredState()
+ IJobUpdateInstructions instructions = update.get();
+ RangeSet<Integer> initialInstances = getInstanceIds(instructions.getInitialState());
+ RangeSet<Integer> desiredInstances = getInstanceIds(instructions.isSetDesiredState()
? ImmutableSet.of(instructions.getDesiredState())
: ImmutableSet.<IInstanceTaskConfig>of());
- int instanceId = input.getAssignedTask().getInstanceId();
+ int instanceId = task.getInstanceId();
return !initialInstances.contains(instanceId) && !desiredInstances.contains(instanceId);
}
return true;
@@ -379,18 +380,31 @@ public interface QuotaManager {
};
}
- private static FluentIterable<IJobUpdate> fetchActiveJobUpdates(
- JobUpdateStore jobUpdateStore,
- String role) {
+ private static final Function<IJobUpdate, IJobUpdateInstructions> UPDATE_TO_INSTRUCTIONS =
+ new Function<IJobUpdate, IJobUpdateInstructions>() {
+ @Override
+ public IJobUpdateInstructions apply(IJobUpdate update) {
+ return update.getInstructions();
+ }
+ };
- List<IJobUpdateSummary> summaries = jobUpdateStore.fetchJobUpdateSummaries(updateQuery(role));
+ private static Map<IJobKey, IJobUpdateInstructions> fetchActiveJobUpdates(
+ final JobUpdateStore jobUpdateStore,
+ String role) {
- Set<IJobUpdate> updates = Sets.newHashSet();
- for (IJobUpdateSummary summary : summaries) {
- updates.add(jobUpdateStore.fetchJobUpdate(summary.getKey()).get());
- }
+ Function<IJobUpdateSummary, IJobUpdate> fetchUpdate =
+ new Function<IJobUpdateSummary, IJobUpdate>() {
+ @Override
+ public IJobUpdate apply(IJobUpdateSummary summary) {
+ return jobUpdateStore.fetchJobUpdate(summary.getKey()).get();
+ }
+ };
- return FluentIterable.from(updates);
+ return Maps.transformValues(
+ FluentIterable.from(jobUpdateStore.fetchJobUpdateSummaries(updateQuery(role)))
+ .transform(fetchUpdate)
+ .uniqueIndex(UPDATE_TO_JOB_KEY),
+ UPDATE_TO_INSTRUCTIONS);
}
@VisibleForTesting
@@ -400,68 +414,84 @@ public interface QuotaManager {
.setUpdateStatuses(Updates.ACTIVE_JOB_UPDATE_STATES));
}
- private static RangeSet<Integer> instanceRangeSet(Set<IInstanceTaskConfig> configs) {
- ImmutableRangeSet.Builder<Integer> builder = ImmutableRangeSet.builder();
- for (IInstanceTaskConfig config : configs) {
- for (IRange range : config.getInstances()) {
- builder.add(Range.closed(range.getFirst(), range.getLast()));
- }
- }
+ private static final Function<IInstanceTaskConfig, ITaskConfig> INSTANCE_CONFIG =
+ new Function<IInstanceTaskConfig, ITaskConfig>() {
+ @Override
+ public ITaskConfig apply(IInstanceTaskConfig config) {
+ return config.getTask();
+ }
+ };
+
+ private static final Function<ITaskConfig, IResourceAggregate> CONFIG_RESOURCES =
+ new Function<ITaskConfig, IResourceAggregate>() {
+ @Override
+ public IResourceAggregate apply(ITaskConfig config) {
+ return IResourceAggregate.build(new ResourceAggregate()
+ .setNumCpus(config.getNumCpus())
+ .setRamMb(config.getRamMb())
+ .setDiskMb(config.getDiskMb()));
+ }
+ };
+
+ private static final Function<IInstanceTaskConfig, IResourceAggregate> INSTANCE_RESOURCES =
+ new Function<IInstanceTaskConfig, IResourceAggregate>() {
+ @Override
+ public IResourceAggregate apply(IInstanceTaskConfig config) {
+ return scale(config.getTask(), getUpdateInstanceCount(config.getInstances()));
+ }
+ };
+
+ private static IResourceAggregate instructionsToResources(
+ Iterable<IInstanceTaskConfig> instructions) {
- return builder.build();
+ return addAll(FluentIterable.from(instructions).transform(INSTANCE_RESOURCES));
}
/**
- * This function calculates max aggregate resources consumed by the job update
+ * Calculates max aggregate resources consumed by the job update
* {@code instructions}. The max is calculated between existing and desired task configs on per
* resource basis. This means max CPU, RAM and DISK values are computed individually and may
* come from different task configurations. While it may not be the most accurate
* representation of job update resources during the update, it does guarantee none of the
* individual resource values is exceeded during the forward/back roll.
- *
+ * <p/>
* NOTE: In case of a job update converting the job production bit (i.e. prod -> non-prod or
* non-prod -> prod), only the matching state is counted towards consumption. For example,
* prod -> non-prod AND {@code prodConsumption=True}: only the initial state is accounted.
- *
- * @param instructions Update instructions with resource definitions.
- * @param isProd Flag indicating whether the prod or non-prod calculation requested.
- * @return Resources consumed by the update.
*/
- private static IResourceAggregate instructionsToResources(
- IJobUpdateInstructions instructions,
- final boolean isProd) {
-
- // Calculate initial state consumption.
- IResourceAggregate initial = EMPTY;
- for (IInstanceTaskConfig group : instructions.getInitialState()) {
- ITaskConfig task = group.getTask();
- if (isProd == task.isProduction()) {
- for (IRange range : group.getInstances()) {
- initial = add(initial, scale(task, instanceCountFromRange(range)));
- }
- }
- }
-
- // Calculate desired state consumption.
- IResourceAggregate desired = Optional.fromNullable(instructions.getDesiredState())
- .transform(new Function<IInstanceTaskConfig, IResourceAggregate>() {
- @Override
- public IResourceAggregate apply(IInstanceTaskConfig input) {
- return isProd == input.getTask().isProduction()
- ? scale(input.getTask(), getUpdateInstanceCount(input.getInstances()))
- : EMPTY;
- }
- }).or(EMPTY);
+ private static Function<IJobUpdateInstructions, IResourceAggregate> updateResources(
+ final Predicate<IInstanceTaskConfig> instanceFilter) {
- // Calculate result as max(existing, desired) per resource type.
- return max(initial, desired);
+ return new Function<IJobUpdateInstructions, IResourceAggregate>() {
+ @Override
+ public IResourceAggregate apply(IJobUpdateInstructions instructions) {
+ Iterable<IInstanceTaskConfig> initialState =
+ Iterables.filter(instructions.getInitialState(), instanceFilter);
+ Iterable<IInstanceTaskConfig> desiredState = Iterables.filter(
+ Optional.fromNullable(instructions.getDesiredState()).asSet(),
+ instanceFilter);
+
+ // Calculate result as max(existing, desired) per resource type.
+ return max(
+ instructionsToResources(initialState),
+ instructionsToResources(desiredState));
+ }
+ };
}
private static IResourceAggregate add(IResourceAggregate a, IResourceAggregate b) {
- return IResourceAggregate.build(new ResourceAggregate()
- .setNumCpus(a.getNumCpus() + b.getNumCpus())
- .setRamMb(a.getRamMb() + b.getRamMb())
- .setDiskMb(a.getDiskMb() + b.getDiskMb()));
+ return addAll(Arrays.asList(a, b));
+ }
+
+ private static IResourceAggregate addAll(Iterable<IResourceAggregate> aggregates) {
+ IResourceAggregate total = EMPTY;
+ for (IResourceAggregate aggregate : aggregates) {
+ total = IResourceAggregate.build(new ResourceAggregate()
+ .setNumCpus(total.getNumCpus() + aggregate.getNumCpus())
+ .setRamMb(total.getRamMb() + aggregate.getRamMb())
+ .setDiskMb(total.getDiskMb() + aggregate.getDiskMb()));
+ }
+ return total;
}
private static IResourceAggregate subtract(IResourceAggregate a, IResourceAggregate b) {
@@ -479,7 +509,7 @@ public interface QuotaManager {
}
private static IResourceAggregate scale(ITaskConfig taskConfig, int instanceCount) {
- return ResourceAggregates.scale(fromTasks(ImmutableSet.of(taskConfig)), instanceCount);
+ return ResourceAggregates.scale(CONFIG_RESOURCES.apply(taskConfig), instanceCount);
}
private static IResourceAggregate scale(IJobConfiguration jobConfiguration) {
@@ -487,19 +517,7 @@ public interface QuotaManager {
}
private static IResourceAggregate fromTasks(Iterable<ITaskConfig> tasks) {
- double cpu = 0;
- int ramMb = 0;
- int diskMb = 0;
- for (ITaskConfig task : tasks) {
- cpu += task.getNumCpus();
- ramMb += task.getRamMb();
- diskMb += task.getDiskMb();
- }
-
- return IResourceAggregate.build(new ResourceAggregate()
- .setNumCpus(cpu)
- .setRamMb(ramMb)
- .setDiskMb(diskMb));
+ return addAll(Iterables.transform(tasks, CONFIG_RESOURCES));
}
private static final Function<IJobUpdate, IJobKey> UPDATE_TO_JOB_KEY =
@@ -513,14 +531,10 @@ public interface QuotaManager {
private static int getUpdateInstanceCount(Set<IRange> ranges) {
int instanceCount = 0;
for (IRange range : ranges) {
- instanceCount += instanceCountFromRange(range);
+ instanceCount += range.getLast() - range.getFirst() + 1;
}
return instanceCount;
}
-
- private static int instanceCountFromRange(IRange range) {
- return range.getLast() - range.getFirst() + 1;
- }
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/4b9c759c/src/main/java/org/apache/aurora/scheduler/updater/UpdateFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/UpdateFactory.java b/src/main/java/org/apache/aurora/scheduler/updater/UpdateFactory.java
index b530861..b87ae4e 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/UpdateFactory.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/UpdateFactory.java
@@ -19,7 +19,6 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.collect.DiscreteDomain;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableRangeSet;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
@@ -147,14 +146,7 @@ interface UpdateFactory {
@VisibleForTesting
static Set<Integer> expandInstanceIds(Set<IInstanceTaskConfig> instanceGroups) {
- ImmutableRangeSet.Builder<Integer> instanceIds = ImmutableRangeSet.builder();
- for (IInstanceTaskConfig group : instanceGroups) {
- for (IRange range : group.getInstances()) {
- instanceIds.add(toRange(range));
- }
- }
-
- return instanceIds.build().asSet(DiscreteDomain.integers());
+ return Updates.getInstanceIds(instanceGroups).asSet(DiscreteDomain.integers());
}
private static Optional<ITaskConfig> getConfig(
http://git-wip-us.apache.org/repos/asf/aurora/blob/4b9c759c/src/main/java/org/apache/aurora/scheduler/updater/Updates.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/Updates.java b/src/main/java/org/apache/aurora/scheduler/updater/Updates.java
index 776278c..6466473 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/Updates.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/Updates.java
@@ -15,13 +15,17 @@ package org.apache.aurora.scheduler.updater;
import java.util.Set;
+import com.google.common.collect.ImmutableRangeSet;
+import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import org.apache.aurora.gen.JobUpdateKey;
import org.apache.aurora.gen.JobUpdateStatus;
import org.apache.aurora.gen.JobUpdateSummary;
import org.apache.aurora.gen.apiConstants;
+import org.apache.aurora.scheduler.storage.entities.IInstanceTaskConfig;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary;
+import org.apache.aurora.scheduler.storage.entities.IRange;
/**
* Utility functions for job updates.
@@ -53,4 +57,22 @@ public final class Updates {
return IJobUpdateSummary.build(mutableSummary);
}
}
+
+ /**
+ * Creates a range set representing all instance IDs represented by a set of instance
+ * configurations included in a job update.
+ *
+ * @param configs Job update components.
+ * @return A range set representing the instance IDs mentioned in instance groupings.
+ */
+ public static ImmutableRangeSet<Integer> getInstanceIds(Set<IInstanceTaskConfig> configs) {
+ ImmutableRangeSet.Builder<Integer> builder = ImmutableRangeSet.builder();
+ for (IInstanceTaskConfig config : configs) {
+ for (IRange range : config.getInstances()) {
+ builder.add(Range.closed(range.getFirst(), range.getLast()));
+ }
+ }
+
+ return builder.build();
+ }
}