You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2013/12/31 22:20:41 UTC

[48/51] [partial] Rename twitter* and com.twitter to apache and org.apache directories to preserve all file history before the refactor.

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/async/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/async/TaskScheduler.java b/src/main/java/com/twitter/aurora/scheduler/async/TaskScheduler.java
deleted file mode 100644
index 0ad9e13..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/async/TaskScheduler.java
+++ /dev/null
@@ -1,257 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.async;
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Ticker;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.collect.Iterables;
-import com.google.common.eventbus.Subscribe;
-import com.google.inject.BindingAnnotation;
-
-import org.apache.mesos.Protos.Offer;
-import org.apache.mesos.Protos.SlaveID;
-import org.apache.mesos.Protos.TaskInfo;
-
-import com.twitter.aurora.scheduler.base.Query;
-import com.twitter.aurora.scheduler.events.PubsubEvent.EventSubscriber;
-import com.twitter.aurora.scheduler.events.PubsubEvent.TaskStateChange;
-import com.twitter.aurora.scheduler.state.StateManager;
-import com.twitter.aurora.scheduler.state.TaskAssigner;
-import com.twitter.aurora.scheduler.storage.Storage;
-import com.twitter.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import com.twitter.aurora.scheduler.storage.Storage.MutateWork;
-import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
-import com.twitter.common.inject.TimedInterceptor.Timed;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.stats.StatImpl;
-import com.twitter.common.stats.Stats;
-import com.twitter.common.util.Clock;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import static com.twitter.aurora.gen.ScheduleStatus.LOST;
-import static com.twitter.aurora.gen.ScheduleStatus.PENDING;
-
-/**
- * Enables scheduling and preemption of tasks.
- */
-interface TaskScheduler extends EventSubscriber {
-
-  /**
-   * Attempts to schedule a task, possibly performing irreversible actions.
-   *
-   * @param taskId The task to attempt to schedule.
-   * @return SUCCESS if the task was scheduled, TRY_AGAIN otherwise. The caller should call schedule
-   * again if TRY_AGAIN is returned.
-   */
-  TaskSchedulerResult schedule(String taskId);
-
-  enum TaskSchedulerResult {
-    SUCCESS,
-    TRY_AGAIN
-  }
-
-  /**
-   * An asynchronous task scheduler.  Scheduling of tasks is performed on a delay, where each task
-   * backs off after a failed scheduling attempt.
-   * <p>
-   * Pending tasks are advertised to the scheduler via internal pubsub notifications.
-   */
-  class TaskSchedulerImpl implements TaskScheduler {
-    /**
-     * Binding annotation for the time duration of reservations
-     */
-    @BindingAnnotation
-    @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
-    @interface ReservationDuration { }
-
-    private static final Logger LOG = Logger.getLogger(TaskSchedulerImpl.class.getName());
-
-    private final Storage storage;
-    private final StateManager stateManager;
-    private final TaskAssigner assigner;
-    private final OfferQueue offerQueue;
-    private final Preemptor preemptor;
-    private final Reservations reservations;
-
-    private final AtomicLong scheduleAttemptsFired = Stats.exportLong("schedule_attempts_fired");
-    private final AtomicLong scheduleAttemptsFailed = Stats.exportLong("schedule_attempts_failed");
-
-    @Inject
-    TaskSchedulerImpl(
-        Storage storage,
-        StateManager stateManager,
-        TaskAssigner assigner,
-        OfferQueue offerQueue,
-        Preemptor preemptor,
-        @ReservationDuration Amount<Long, Time> reservationDuration,
-        final Clock clock) {
-
-      this.storage = checkNotNull(storage);
-      this.stateManager = checkNotNull(stateManager);
-      this.assigner = checkNotNull(assigner);
-      this.offerQueue = checkNotNull(offerQueue);
-      this.preemptor = checkNotNull(preemptor);
-      this.reservations = new Reservations(reservationDuration, clock);
-    }
-
-    private Function<Offer, Optional<TaskInfo>> getAssignerFunction(
-        final String taskId,
-        final IScheduledTask task) {
-
-      return new Function<Offer, Optional<TaskInfo>>() {
-        @Override public Optional<TaskInfo> apply(Offer offer) {
-          Optional<String> reservedTaskId = reservations.getSlaveReservation(offer.getSlaveId());
-          if (reservedTaskId.isPresent()) {
-            if (taskId.equals(reservedTaskId.get())) {
-              // Slave is reserved to satisfy this task.
-              return assigner.maybeAssign(offer, task);
-            } else {
-              // Slave is reserved for another task.
-              return Optional.absent();
-            }
-          } else {
-            // Slave is not reserved.
-            return assigner.maybeAssign(offer, task);
-          }
-        }
-      };
-    }
-
-    @VisibleForTesting
-    static final Optional<String> LAUNCH_FAILED_MSG =
-        Optional.of("Unknown exception attempting to schedule task.");
-
-    @Timed("task_schedule_attempt")
-    @Override
-    public TaskSchedulerResult schedule(final String taskId) {
-      scheduleAttemptsFired.incrementAndGet();
-      try {
-        return storage.write(new MutateWork.Quiet<TaskSchedulerResult>() {
-          @Override public TaskSchedulerResult apply(MutableStoreProvider store) {
-            LOG.fine("Attempting to schedule task " + taskId);
-            Query.Builder pendingTaskQuery = Query.taskScoped(taskId).byStatus(PENDING);
-            final IScheduledTask task =
-                Iterables.getOnlyElement(store.getTaskStore().fetchTasks(pendingTaskQuery), null);
-            if (task == null) {
-              LOG.warning("Failed to look up task " + taskId + ", it may have been deleted.");
-            } else {
-              try {
-                if (!offerQueue.launchFirst(getAssignerFunction(taskId, task))) {
-                  // Task could not be scheduled.
-                  maybePreemptFor(taskId);
-                  return TaskSchedulerResult.TRY_AGAIN;
-                }
-              } catch (OfferQueue.LaunchException e) {
-                LOG.log(Level.WARNING, "Failed to launch task.", e);
-                scheduleAttemptsFailed.incrementAndGet();
-
-                // The attempt to schedule the task failed, so we need to backpedal on the
-                // assignment.
-                // It is in the LOST state and a new task will move to PENDING to replace it.
-                // Should the state change fail due to storage issues, that's okay.  The task will
-                // time out in the ASSIGNED state and be moved to LOST.
-                stateManager.changeState(pendingTaskQuery, LOST, LAUNCH_FAILED_MSG);
-              }
-            }
-
-            return TaskSchedulerResult.SUCCESS;
-          }
-        });
-      } catch (RuntimeException e) {
-        // We catch the generic unchecked exception here to ensure tasks are not abandoned
-        // if there is a transient issue resulting in an unchecked exception.
-        LOG.log(Level.WARNING, "Task scheduling unexpectedly failed, will be retried", e);
-        scheduleAttemptsFailed.incrementAndGet();
-        return TaskSchedulerResult.TRY_AGAIN;
-      }
-    }
-
-    private void maybePreemptFor(String taskId) {
-      if (reservations.hasReservationForTask(taskId)) {
-        return;
-      }
-      Optional<String> slaveId = preemptor.findPreemptionSlotFor(taskId);
-      if (slaveId.isPresent()) {
-        this.reservations.add(SlaveID.newBuilder().setValue(slaveId.get()).build(), taskId);
-      }
-    }
-
-    @Subscribe
-    public void taskChanged(final TaskStateChange stateChangeEvent) {
-      if (stateChangeEvent.getOldState() == PENDING) {
-        reservations.invalidateTask(stateChangeEvent.getTaskId());
-      }
-    }
-
-    private static class Reservations {
-      private final Cache<SlaveID, String> reservations;
-
-      Reservations(final Amount<Long, Time> duration, final Clock clock) {
-        checkNotNull(duration);
-        checkNotNull(clock);
-        this.reservations = CacheBuilder.newBuilder()
-            .expireAfterWrite(duration.as(Time.MINUTES), TimeUnit.MINUTES)
-            .ticker(new Ticker() {
-              @Override public long read() {
-                return clock.nowNanos();
-              }
-            })
-            .build();
-        Stats.export(new StatImpl<Long>("reservation_cache_size") {
-          @Override public Long read() {
-            return reservations.size();
-          }
-        });
-      }
-
-      private synchronized void add(SlaveID slaveId, String taskId) {
-        reservations.put(slaveId, taskId);
-      }
-
-      private synchronized boolean hasReservationForTask(String taskId) {
-        return reservations.asMap().containsValue(taskId);
-      }
-
-      private synchronized Optional<String> getSlaveReservation(SlaveID slaveID) {
-        return Optional.fromNullable(reservations.getIfPresent(slaveID));
-      }
-
-      private synchronized void invalidateTask(String taskId) {
-        reservations.asMap().values().remove(taskId);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/async/TaskTimeout.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/async/TaskTimeout.java b/src/main/java/com/twitter/aurora/scheduler/async/TaskTimeout.java
deleted file mode 100644
index 19848c7..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/async/TaskTimeout.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.async;
-
-import java.util.EnumSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Objects;
-import com.google.common.base.Optional;
-import com.google.common.base.Predicate;
-import com.google.common.base.Supplier;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Ordering;
-import com.google.common.eventbus.Subscribe;
-
-import com.twitter.aurora.gen.ScheduleStatus;
-import com.twitter.aurora.scheduler.base.Query;
-import com.twitter.aurora.scheduler.base.Tasks;
-import com.twitter.aurora.scheduler.events.PubsubEvent.EventSubscriber;
-import com.twitter.aurora.scheduler.events.PubsubEvent.StorageStarted;
-import com.twitter.aurora.scheduler.events.PubsubEvent.TaskStateChange;
-import com.twitter.aurora.scheduler.state.StateManager;
-import com.twitter.aurora.scheduler.storage.Storage;
-import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.stats.StatsProvider;
-import com.twitter.common.util.Clock;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Observes task transitions and identifies tasks that are 'stuck' in a transient state.  Stuck
- * tasks will be transitioned to the LOST state.
- */
-class TaskTimeout implements EventSubscriber {
-  private static final Logger LOG = Logger.getLogger(TaskTimeout.class.getName());
-
-  @VisibleForTesting
-  static final String TIMED_OUT_TASKS_COUNTER = "timed_out_tasks";
-
-  @VisibleForTesting
-  static final String TRANSIENT_COUNT_STAT_NAME = "transient_states";
-
-  @VisibleForTesting
-  static final Optional<String> TIMEOUT_MESSAGE = Optional.of("Task timed out");
-
-  @VisibleForTesting
-  static final Set<ScheduleStatus> TRANSIENT_STATES = EnumSet.of(
-      ScheduleStatus.ASSIGNED,
-      ScheduleStatus.PREEMPTING,
-      ScheduleStatus.RESTARTING,
-      ScheduleStatus.KILLING);
-
-  @VisibleForTesting
-  static final Query.Builder TRANSIENT_QUERY = Query.unscoped().byStatus(TRANSIENT_STATES);
-
-  private final Map<TimeoutKey, Context> futures = Maps.newConcurrentMap();
-
-  private static final class TimeoutKey {
-    private final String taskId;
-    private final ScheduleStatus status;
-
-    private TimeoutKey(String taskId, ScheduleStatus status) {
-      this.taskId = taskId;
-      this.status = status;
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hashCode(taskId, status);
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (!(o instanceof TimeoutKey)) {
-        return false;
-      }
-      TimeoutKey key = (TimeoutKey) o;
-      return Objects.equal(taskId, key.taskId)
-          && (status == key.status);
-    }
-
-    @Override
-    public String toString() {
-      return taskId + ":" + status;
-    }
-  }
-
-  private final Storage storage;
-  private final ScheduledExecutorService executor;
-  private final StateManager stateManager;
-  private final long timeoutMillis;
-  private final Clock clock;
-  private final AtomicLong timedOutTasks;
-
-  @Inject
-  TaskTimeout(
-      Storage storage,
-      ScheduledExecutorService executor,
-      StateManager stateManager,
-      final Clock clock,
-      Amount<Long, Time> timeout,
-      StatsProvider statsProvider) {
-
-    this.storage = checkNotNull(storage);
-    this.executor = checkNotNull(executor);
-    this.stateManager = checkNotNull(stateManager);
-    this.timeoutMillis = timeout.as(Time.MILLISECONDS);
-    this.clock = checkNotNull(clock);
-    this.timedOutTasks = statsProvider.makeCounter(TIMED_OUT_TASKS_COUNTER);
-
-    exportStats(statsProvider);
-  }
-
-  private void registerTimeout(TimeoutKey key) {
-    // This is an obvious check-then-act, but:
-    //   - there isn't much of a better option, given that we have to get the Future before
-    //     inserting into the map
-    //   - a key collision only happens in practice if something is wrong externally to this class
-    //     (double event for the same state)
-    //   - the outcome is low-risk, we would wind up with a redundant Future that will eventually
-    //     no-op
-    if (!futures.containsKey(key)) {
-      Future<?> timeoutHandler = executor.schedule(
-          new TimedOutTaskHandler(key),
-          timeoutMillis,
-          TimeUnit.MILLISECONDS);
-      futures.put(key, new Context(clock.nowMillis(), timeoutHandler));
-    }
-  }
-
-  private static boolean isTransient(ScheduleStatus status) {
-    return TRANSIENT_STATES.contains(status);
-  }
-
-  @Subscribe
-  public void recordStateChange(TaskStateChange change) {
-    String taskId = change.getTaskId();
-    ScheduleStatus newState = change.getNewState();
-    if (isTransient(change.getOldState())) {
-      TimeoutKey oldKey = new TimeoutKey(taskId, change.getOldState());
-      Context context = futures.remove(oldKey);
-      if (context != null) {
-        LOG.fine("Canceling state timeout for task " + oldKey);
-        context.future.cancel(false);
-      }
-    }
-
-    if (isTransient(newState)) {
-      registerTimeout(new TimeoutKey(taskId, change.getNewState()));
-    }
-  }
-
-  @Subscribe
-  public void storageStarted(StorageStarted event) {
-    for (IScheduledTask task : Storage.Util.consistentFetchTasks(storage, TRANSIENT_QUERY)) {
-      registerTimeout(new TimeoutKey(Tasks.id(task), task.getStatus()));
-    }
-  }
-
-  private class TimedOutTaskHandler implements Runnable {
-    private final TimeoutKey key;
-
-    TimedOutTaskHandler(TimeoutKey key) {
-      this.key = key;
-    }
-
-    @Override public void run() {
-      Context context = futures.get(key);
-      try {
-        if (context == null) {
-          LOG.warning("Timeout context not found for " + key);
-          return;
-        }
-
-        LOG.info("Timeout reached for task " + key);
-        // This query acts as a CAS by including the state that we expect the task to be in if the
-        // timeout is still valid.  Ideally, the future would have already been canceled, but in the
-        // event of a state transition race, including transientState prevents an unintended
-        // task timeout.
-        Query.Builder query = Query.taskScoped(key.taskId).byStatus(key.status);
-        // Note: This requires LOST transitions trigger Driver.killTask.
-        if (stateManager.changeState(query, ScheduleStatus.LOST, TIMEOUT_MESSAGE) > 0) {
-          timedOutTasks.incrementAndGet();
-        } else {
-          LOG.warning("Task " + key + " does not exist, or was not in the expected state.");
-        }
-      } finally {
-        futures.remove(key);
-      }
-    }
-  }
-
-  private class Context {
-    private final long timestampMillis;
-    private final Future<?> future;
-
-    Context(long timestampMillis, Future<?> future) {
-      this.timestampMillis = timestampMillis;
-      this.future = future;
-    }
-  }
-
-  private static final Function<Context, Long> CONTEXT_TIMESTAMP = new Function<Context, Long>() {
-    @Override public Long apply(Context context) {
-      return context.timestampMillis;
-    }
-  };
-
-  private static final Ordering<Context> TIMESTAMP_ORDER =
-      Ordering.natural().onResultOf(CONTEXT_TIMESTAMP);
-
-  @VisibleForTesting
-  static String waitingTimeStatName(ScheduleStatus status) {
-    return "scheduler_max_" + status + "_waiting_ms";
-  }
-
-  private void exportStats(StatsProvider statsProvider) {
-    statsProvider.makeGauge(TRANSIENT_COUNT_STAT_NAME, new Supplier<Number>() {
-      @Override public Number get() {
-          return futures.size();
-        }
-    });
-
-    for (final ScheduleStatus status : TRANSIENT_STATES) {
-      statsProvider.makeGauge(waitingTimeStatName(status), new Supplier<Number>() {
-        private final Predicate<TimeoutKey> statusMatcher = new Predicate<TimeoutKey>() {
-          @Override public boolean apply(TimeoutKey key) {
-            return key.status == status;
-          }
-        };
-
-        @Override public Number get() {
-          Iterable<Context> matches = Maps.filterKeys(futures, statusMatcher).values();
-          if (Iterables.isEmpty(matches)) {
-            return 0L;
-          } else {
-            return clock.nowMillis() - TIMESTAMP_ORDER.min(matches).timestampMillis;
-          }
-        }
-      });
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/base/CommandUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/base/CommandUtil.java b/src/main/java/com/twitter/aurora/scheduler/base/CommandUtil.java
deleted file mode 100644
index b11c683..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/base/CommandUtil.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.base;
-
-import org.apache.mesos.Protos.CommandInfo;
-import org.apache.mesos.Protos.CommandInfo.URI;
-
-import com.twitter.common.base.MorePreconditions;
-
-/**
- * Utility class for constructing {@link CommandInfo} objects given an executor URI.
- */
-public final class CommandUtil {
-
-  private CommandUtil() {
-    // Utility class.
-  }
-
-  private static String uriBasename(String uri) {
-    int lastSlash = uri.lastIndexOf("/");
-    if (lastSlash == -1) {
-      return uri;
-    } else {
-      String basename = uri.substring(lastSlash + 1);
-      MorePreconditions.checkNotBlank(basename, "URI must not end with a slash.");
-
-      return basename;
-    }
-  }
-
-  /**
-   * Creates a description of a command that will fetch and execute the given URI to an executor
-   * binary.
-   *
-   * @param executorUri URI to the executor.
-   * @return A command that will fetch and execute the executor.
-   */
-  public static CommandInfo create(String executorUri) {
-    MorePreconditions.checkNotBlank(executorUri);
-
-    return CommandInfo.newBuilder()
-        .addUris(URI.newBuilder().setValue(executorUri).setExecutable(true))
-        .setValue("./" + uriBasename(executorUri))
-        .build();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/base/Conversions.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/base/Conversions.java b/src/main/java/com/twitter/aurora/scheduler/base/Conversions.java
deleted file mode 100644
index 2f84b5c..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/base/Conversions.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.base;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.logging.Logger;
-
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicates;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
-
-import org.apache.mesos.Protos;
-import org.apache.mesos.Protos.Offer;
-import org.apache.mesos.Protos.TaskState;
-
-import com.twitter.aurora.gen.Attribute;
-import com.twitter.aurora.gen.HostAttributes;
-import com.twitter.aurora.gen.ScheduleStatus;
-
-/**
- * Collection of utility functions to convert mesos protobuf types to internal thrift types.
- */
-public final class Conversions {
-
-  private static final Logger LOG = Logger.getLogger(Conversions.class.getName());
-
-  private Conversions() {
-    // Utility class.
-  }
-
-  // Maps from mesos state to scheduler interface state.
-  private static final Map<TaskState, ScheduleStatus> STATE_TRANSLATION =
-      new ImmutableMap.Builder<TaskState, ScheduleStatus>()
-          .put(TaskState.TASK_STARTING, ScheduleStatus.STARTING)
-          .put(TaskState.TASK_RUNNING, ScheduleStatus.RUNNING)
-          .put(TaskState.TASK_FINISHED, ScheduleStatus.FINISHED)
-          .put(TaskState.TASK_FAILED, ScheduleStatus.FAILED)
-          .put(TaskState.TASK_KILLED, ScheduleStatus.KILLED)
-          .put(TaskState.TASK_LOST, ScheduleStatus.LOST)
-          .build();
-
-  /**
-   * Converts a protobuf state to an internal schedule status.
-   *
-   * @param taskState Protobuf state.
-   * @return Equivalent thrift-generated state.
-   */
-  public static ScheduleStatus convertProtoState(TaskState taskState) {
-    ScheduleStatus status = STATE_TRANSLATION.get(taskState);
-    Preconditions.checkArgument(status != null, "Unrecognized task state " + taskState);
-    return status;
-  }
-
-  private static final Function<Protos.Attribute, String> ATTRIBUTE_NAME =
-      new Function<Protos.Attribute, String>() {
-        @Override public String apply(Protos.Attribute attr) {
-          return attr.getName();
-        }
-      };
-
-  /**
-   * Typedef to make anonymous implementation more concise.
-   */
-  private abstract static class AttributeConverter
-      implements Function<Entry<String, Collection<Protos.Attribute>>, Attribute> {
-  }
-
-  private static final Function<Protos.Attribute, String> VALUE_CONVERTER =
-      new Function<Protos.Attribute, String>() {
-        @Override public String apply(Protos.Attribute attribute) {
-          switch (attribute.getType()) {
-            case SCALAR:
-              return String.valueOf(attribute.getScalar().getValue());
-
-            case TEXT:
-              return attribute.getText().getValue();
-
-            default:
-              LOG.finest("Unrecognized attribute type:" + attribute.getType() + " , ignoring.");
-              return null;
-          }
-        }
-      };
-
-  private static final AttributeConverter ATTRIBUTE_CONVERTER = new AttributeConverter() {
-    @Override public Attribute apply(Entry<String, Collection<Protos.Attribute>> entry) {
-      // Convert values and filter any that were ignored.
-      return new Attribute(
-          entry.getKey(),
-          FluentIterable.from(entry.getValue())
-              .transform(VALUE_CONVERTER)
-              .filter(Predicates.notNull())
-              .toSet());
-    }
-  };
-
-  /**
-   * Converts protobuf attributes into thrift-generated attributes.
-   *
-   * @param offer Resource offer.
-   * @return Equivalent thrift host attributes.
-   */
-  public static HostAttributes getAttributes(Offer offer) {
-    // Group by attribute name.
-    Multimap<String, Protos.Attribute> valuesByName =
-        Multimaps.index(offer.getAttributesList(), ATTRIBUTE_NAME);
-
-    // TODO(William Farner): Include slave id.
-    return new HostAttributes(
-        offer.getHostname(),
-        FluentIterable.from(valuesByName.asMap().entrySet())
-            .transform(ATTRIBUTE_CONVERTER)
-            .toSet())
-        .setSlaveId(offer.getSlaveId().getValue());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/base/JobKeys.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/base/JobKeys.java b/src/main/java/com/twitter/aurora/scheduler/base/JobKeys.java
deleted file mode 100644
index 008e1cb..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/base/JobKeys.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.base;
-
-import javax.annotation.Nullable;
-
-import com.google.common.base.Function;
-import com.google.common.base.Functions;
-import com.google.common.base.Optional;
-import com.google.common.base.Strings;
-
-import com.twitter.aurora.gen.JobKey;
-import com.twitter.aurora.gen.TaskQuery;
-import com.twitter.aurora.scheduler.storage.entities.IJobConfiguration;
-import com.twitter.aurora.scheduler.storage.entities.IJobKey;
-import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-/**
- * Utility class providing convenience functions relating to JobKeys.
- */
-public final class JobKeys {
-  private JobKeys() {
-    // Utility class.
-  }
-
-  public static final Function<IJobConfiguration, IJobKey> FROM_CONFIG =
-      new Function<IJobConfiguration, IJobKey>() {
-        @Override public IJobKey apply(IJobConfiguration job) {
-          return job.getKey();
-        }
-      };
-
-  public static final Function<IJobKey, String> TO_ROLE =
-      new Function<IJobKey, String>() {
-        @Override public String apply(IJobKey jobKey) {
-          return jobKey.getRole();
-        }
-      };
-
-  public static final Function<IJobKey, String> TO_ENVIRONMENT =
-      new Function<IJobKey, String>() {
-        @Override public String apply(IJobKey jobKey) {
-          return jobKey.getEnvironment();
-        }
-      };
-
-  public static final Function<IJobKey, String> TO_JOB_NAME =
-      new Function<IJobKey, String>() {
-        @Override public String apply(IJobKey jobKey) {
-          return jobKey.getName();
-        }
-      };
-
-  public static final Function<IJobConfiguration, String> CONFIG_TO_ROLE =
-      Functions.compose(TO_ROLE, FROM_CONFIG);
-
-  /**
-   * Check that a jobKey struct is valid.
-   *
-   * @param jobKey The jobKey to validate.
-   * @return {@code true} if the jobKey validates.
-   */
-  public static boolean isValid(@Nullable IJobKey jobKey) {
-    return jobKey != null
-        && !Strings.isNullOrEmpty(jobKey.getRole())
-        && !Strings.isNullOrEmpty(jobKey.getEnvironment())
-        && !Strings.isNullOrEmpty(jobKey.getName());
-  }
-
-  /**
-   * Assert that a jobKey struct is valid.
-   *
-   * @param jobKey The key struct to validate.
-   * @return The validated jobKey argument.
-   * @throws IllegalArgumentException if the key struct fails to validate.
-   */
-  public static IJobKey assertValid(IJobKey jobKey) throws IllegalArgumentException {
-    checkArgument(isValid(jobKey));
-
-    return jobKey;
-  }
-
-  /**
-   * Attempt to create a valid JobKey from the given (role, environment, name) triple.
-   *
-   * @param role The job's role.
-   * @param environment The job's environment.
-   * @param name The job's name.
-   * @return A valid JobKey if it can be created.
-   * @throws IllegalArgumentException if the key fails to validate.
-   */
-  public static IJobKey from(String role, String environment, String name)
-      throws IllegalArgumentException {
-
-    IJobKey job = IJobKey.build(new JobKey()
-        .setRole(role)
-        .setEnvironment(environment)
-        .setName(name));
-    return assertValid(job);
-  }
-
-  /**
-   * Attempts to create a valid JobKey from the given task.
-   *
-   * @param task The task to create job key from.
-   * @return A valid JobKey if it can be created.
-   * @throws IllegalArgumentException if the key fails to validate.
-   */
-  public static IJobKey from(ITaskConfig task) throws IllegalArgumentException {
-    return from(task.getOwner().getRole(), task.getEnvironment(), task.getJobName());
-  }
-
-  /**
-   * Create a "/"-delimited String representation of a job key, suitable for logging but not
-   * necessarily suitable for use as a unique identifier.
-   *
-   * @param jobKey Key to represent.
-   * @return "/"-delimited representation of the key.
-   */
-  public static String toPath(IJobKey jobKey) {
-    return jobKey.getRole() + "/" + jobKey.getEnvironment() + "/" + jobKey.getName();
-  }
-
-  /**
-   * Create a "/"-delimited String representation of job key, suitable for logging but not
-   * necessarily suitable for use as a unique identifier.
-   *
-   * @param job Job to represent.
-   * @return "/"-delimited representation of the job's key.
-   */
-  public static String toPath(IJobConfiguration job) {
-    return toPath(job.getKey());
-  }
-
-  /**
-   * Attempt to extract a job key from the given query if it is scoped to a single job.
-   *
-   * @param query Query to extract the key from.
-   * @return A present if one can be extracted, absent otherwise.
-   */
-  public static Optional<IJobKey> from(Query.Builder query) {
-    if (Query.isJobScoped(query)) {
-      TaskQuery taskQuery = query.get();
-      return Optional.of(
-          from(taskQuery.getOwner().getRole(), taskQuery.getEnvironment(), taskQuery.getJobName()));
-
-    } else {
-      return Optional.absent();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/base/Numbers.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/base/Numbers.java b/src/main/java/com/twitter/aurora/scheduler/base/Numbers.java
deleted file mode 100644
index 74b5e0b..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/base/Numbers.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.base;
-
-import java.util.Set;
-
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.PeekingIterator;
-import com.google.common.collect.Range;
-import com.google.common.collect.Sets;
-
-/**
- * Utility class for working with numbers.
- */
-public final class Numbers {
-
-  private Numbers() {
-    // Utility class.
-  }
-
-  /**
-   * Converts a set of integers into a set of contiguous closed ranges that equally represent the
-   * input integers.
-   * <p>
-   * The resulting ranges will be in ascending order.
-   *
-   * @param values Values to transform to ranges.
-   * @return Closed ranges with identical members to the input set.
-   */
-  public static Set<Range<Integer>> toRanges(Iterable<Integer> values) {
-    ImmutableSet.Builder<Range<Integer>> builder = ImmutableSet.builder();
-
-    PeekingIterator<Integer> iterator =
-        Iterators.peekingIterator(Sets.newTreeSet(values).iterator());
-
-    // Build ranges until there are no numbers left.
-    while (iterator.hasNext()) {
-      // Start a new range.
-      int start = iterator.next();
-      int end = start;
-      // Increment the end until the range is non-contiguous.
-      while (iterator.hasNext() && (iterator.peek() == (end + 1))) {
-        end++;
-        iterator.next();
-      }
-
-      builder.add(Range.closed(start, end));
-    }
-
-    return builder.build();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/base/Query.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/base/Query.java b/src/main/java/com/twitter/aurora/scheduler/base/Query.java
deleted file mode 100644
index d02ef87..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/base/Query.java
+++ /dev/null
@@ -1,364 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.base;
-
-import java.util.EnumSet;
-
-import com.google.common.base.Objects;
-import com.google.common.base.Optional;
-import com.google.common.base.Supplier;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.primitives.Ints;
-
-import com.twitter.aurora.gen.Identity;
-import com.twitter.aurora.gen.InstanceKey;
-import com.twitter.aurora.gen.ScheduleStatus;
-import com.twitter.aurora.gen.TaskQuery;
-import com.twitter.aurora.scheduler.storage.entities.IJobKey;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import static org.apache.commons.lang.StringUtils.isEmpty;
-
-/**
- * A utility class to construct storage queries.
- * TODO(Sathya): Add some basic unit tests for isJobScoped and isOnlyJobScoped.
- */
-public final class Query {
-
-  private Query() {
-    // Utility.
-  }
-
-  /**
-   * Checks whether a query is scoped to a specific job.
-   * A query scoped to a job specifies a role and job name.
-   *
-   * @param taskQuery Query to test.
-   * @return {@code true} if the query specifies at least a role and job name,
-   *         otherwise {@code false}.
-   */
-  public static boolean isJobScoped(Builder taskQuery) {
-    TaskQuery query = taskQuery.get();
-    return (query.getOwner() != null)
-        && !isEmpty(query.getOwner().getRole())
-        && !isEmpty(query.getEnvironment())
-        && !isEmpty(query.getJobName());
-  }
-
-  /**
-   * Checks whether a query is strictly scoped to a specific job. A query is strictly job scoped,
-   * iff it has the role, environment and jobName set.
-   *
-   * @param query Query to test.
-   * @return {@code true} if the query is strictly job scoped, otherwise {@code false}.
-   */
-  public static boolean isOnlyJobScoped(Builder query) {
-    Optional<IJobKey> jobKey = JobKeys.from(query);
-    return jobKey.isPresent() && Query.jobScoped(jobKey.get()).equals(query);
-  }
-
-  public static Builder arbitrary(TaskQuery query) {
-    return new Builder(query.deepCopy());
-  }
-
-  public static Builder unscoped() {
-    return new Builder();
-  }
-
-  public static Builder roleScoped(String role) {
-    return unscoped().byRole(role);
-  }
-
-  public static Builder envScoped(String role, String environment) {
-    return unscoped().byEnv(role, environment);
-  }
-
-  public static Builder jobScoped(IJobKey jobKey) {
-    return unscoped().byJob(jobKey);
-  }
-
-  public static Builder instanceScoped(InstanceKey instanceKey) {
-    return instanceScoped(IJobKey.build(instanceKey.getJobKey()), instanceKey.getInstanceId());
-  }
-
-  public static Builder instanceScoped(IJobKey jobKey, int instanceId, int... instanceIds) {
-    return unscoped().byInstances(jobKey, instanceId, instanceIds);
-  }
-
-  public static Builder instanceScoped(IJobKey jobKey, Iterable<Integer> instanceIds) {
-    return unscoped().byInstances(jobKey, instanceIds);
-  }
-
-  public static Builder taskScoped(String taskId, String... taskIds) {
-    return unscoped().byId(taskId, taskIds);
-  }
-
-  public static Builder taskScoped(Iterable<String> taskIds) {
-    return unscoped().byId(taskIds);
-  }
-
-  public static Builder slaveScoped(String slaveHost) {
-    return unscoped().bySlave(slaveHost);
-  }
-
-  public static Builder statusScoped(ScheduleStatus status, ScheduleStatus... statuses) {
-    return unscoped().byStatus(status, statuses);
-  }
-
-  public static Builder statusScoped(Iterable<ScheduleStatus> statuses) {
-    return unscoped().byStatus(statuses);
-  }
-
-  /**
-   * A Builder of TaskQueries. Builders are immutable and provide access to a set of convenience
-   * methods to return a new builder of another scope. Available scope filters include slave,
-   * taskId, role, jobs of a role, and instances of a job.
-   *
-   * <p>
-   * This class does not expose the full functionality of TaskQuery but rather subsets of it that
-   * can be efficiently executed and make sense in the context of the scheduler datastores. This
-   * builder should be preferred over constructing TaskQueries directly.
-   * </p>
-   *
-   * TODO(ksweeney): Add an environment scope.
-   */
-  public static final class Builder implements Supplier<TaskQuery> {
-    private final TaskQuery query;
-
-    private Builder() {
-      this.query = new TaskQuery();
-    }
-
-    private Builder(final TaskQuery query) {
-      this.query = checkNotNull(query); // It is expected that the caller calls deepCopy.
-    }
-
-    /**
-     * Build a query that is the combination of all the filters applied to a Builder. Mutating the
-     * returned object will not affect the state of the builder. Can be called any number of times
-     * and will return a new {@code TaskQuery} each time.
-     *
-     * @return A new TaskQuery satisfying this builder's constraints.
-     */
-    @Override
-    public TaskQuery get() {
-      return query.deepCopy();
-    }
-
-    @Override
-    public boolean equals(Object that) {
-      return that != null
-          && that instanceof Builder
-          && Objects.equal(query, ((Builder) that).query);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hashCode(query);
-    }
-
-    @Override
-    public String toString() {
-      return Objects.toStringHelper(this)
-          .add("query", query)
-          .toString();
-    }
-
-    /**
-     * Create a builder scoped to tasks.
-     *
-     * @param taskId An ID of a task to scope the builder to.
-     * @param taskIds Additional IDs of tasks to scope the builder to (they are ORed together).
-     * @return A new Builder scoped to the given tasks.
-     */
-    public Builder byId(String taskId, String... taskIds) {
-      checkNotNull(taskId);
-
-      return new Builder(
-          query.deepCopy()
-              .setTaskIds(ImmutableSet.<String>builder().add(taskId).add(taskIds).build()));
-    }
-
-    /**
-     * Create a builder scoped to tasks.
-     *
-     * @see #byId(String, String...)
-     *
-     * @param taskIds The IDs of the tasks to scope the query to (ORed together).
-     * @return A new Builder scoped to the given tasks.
-     */
-    public Builder byId(Iterable<String> taskIds) {
-      checkNotNull(taskIds);
-
-      return new Builder(
-          query.deepCopy().setTaskIds(ImmutableSet.copyOf(taskIds)));
-    }
-
-    /**
-     * Create a builder scoped to a role. A role scope conflicts with job and instance scopes.
-     *
-     * @param role The role to scope the query to.
-     * @return A new Builder scoped to the given role.
-     */
-    public Builder byRole(String role) {
-      checkNotNull(role);
-
-      return new Builder(
-          query.deepCopy().setOwner(new Identity().setRole(role)));
-    }
-
-    /**
-     * Create a builder scoped to an environment. An environment scope conflicts with role, job,
-     * and instance scopes.
-     *
-     * @param role The role to scope the query to.
-     * @param environment The environment to scope the query to.
-     * @return A new Builder scoped to the given environment.
-     */
-    public Builder byEnv(String role, String environment) {
-      checkNotNull(role);
-      checkNotNull(environment);
-
-      return new Builder(
-          query.deepCopy()
-              .setOwner(new Identity().setRole(role))
-              .setEnvironment(environment));
-    }
-
-    /**
-     * Returns a new builder scoped to the job uniquely identified by the given key. A job scope
-     * conflicts with role and instance scopes.
-     *
-     * @param jobKey The key of the job to scope the query to.
-     * @return A new Builder scoped to the given jobKey.
-     */
-    public Builder byJob(IJobKey jobKey) {
-      JobKeys.assertValid(jobKey);
-
-      return new Builder(
-          query.deepCopy()
-              .setOwner(new Identity().setRole(jobKey.getRole()))
-              .setEnvironment(jobKey.getEnvironment())
-              .setJobName(jobKey.getName()));
-    }
-
-    /**
-     * Returns a new builder scoped to the slave uniquely identified by the given slaveHost. A
-     * builder can only be scoped to slaves once.
-     *
-     * @param slaveHost The hostname of the slave to scope the query to.
-     * @return A new Builder scoped to the given slave.
-     */
-    public Builder bySlave(String slaveHost) {
-      checkNotNull(slaveHost);
-
-      return new Builder(query.deepCopy().setSlaveHost(slaveHost));
-    }
-
-    /**
-     * Returns a new builder scoped to the given statuses. A builder can only be scoped to statuses
-     * once.
-     *
-     * @param status The status to scope this Builder to.
-     * @param statuses Additional statuses to scope this Builder to (they are ORed together).
-     * @return A new Builder scoped to the given statuses.
-     */
-    public Builder byStatus(ScheduleStatus status, ScheduleStatus... statuses) {
-      checkNotNull(status);
-
-      return new Builder(
-          query.deepCopy().setStatuses(EnumSet.of(status, statuses)));
-    }
-
-    /**
-     * Create a new Builder scoped to statuses.
-     *
-     * @see Builder#byStatus(ScheduleStatus, ScheduleStatus...)
-     *
-     * @param statuses The statuses to scope this Builder to.
-     * @return A new Builder scoped to the given statuses.
-     */
-    public Builder byStatus(Iterable<ScheduleStatus> statuses) {
-      checkNotNull(statuses);
-
-      return new Builder(
-          query.deepCopy().setStatuses(EnumSet.copyOf(ImmutableSet.copyOf(statuses))));
-    }
-
-    /**
-     * Returns a new Builder scoped to the given instances of the given job. A builder can only
-     * be scoped to a set of instances, a job, or a role once.
-     *
-     * @param jobKey The key identifying the job.
-     * @param instanceId An instance id of the target job.
-     * @param instanceIds Additional instance ids of the target job.
-     * @return A new Builder scoped to the given instance ids.
-     */
-    public Builder byInstances(IJobKey jobKey, int instanceId, int... instanceIds) {
-      JobKeys.assertValid(jobKey);
-
-      return new Builder(
-          query.deepCopy()
-              .setOwner(new Identity().setRole(jobKey.getRole()))
-              .setEnvironment(jobKey.getEnvironment())
-              .setJobName(jobKey.getName())
-              .setInstanceIds(ImmutableSet.<Integer>builder()
-                  .add(instanceId)
-                  .addAll(Ints.asList(instanceIds))
-                  .build()));
-    }
-
-    /**
-     * Create a new Builder scoped to instances.
-     *
-     * @see Builder#byInstances
-     *
-     * @param jobKey The key identifying the job.
-     * @param instanceIds Instances of the target job.
-     * @return A new Builder scoped to the given instance ids.
-     */
-    public Builder byInstances(IJobKey jobKey, Iterable<Integer> instanceIds) {
-      JobKeys.assertValid(jobKey);
-      checkNotNull(instanceIds);
-
-      return new Builder(
-          query.deepCopy()
-              .setOwner(new Identity().setRole(jobKey.getRole()))
-              .setEnvironment(jobKey.getEnvironment())
-              .setJobName(jobKey.getName())
-              .setInstanceIds(ImmutableSet.copyOf(instanceIds)));
-    }
-
-    /**
-     * A convenience method to scope this builder to {@link Tasks#ACTIVE_STATES}.
-     *
-     * @return A new Builder scoped to Tasks#ACTIVE_STATES.
-     */
-    public Builder active() {
-      return byStatus(Tasks.ACTIVE_STATES);
-    }
-
-    /**
-     * A convenience method to scope this builder to {@link Tasks#TERMINAL_STATES}.
-     *
-     * @return A new Builder scoped to Tasks#TERMINAL_STATES.
-     */
-    public Builder terminal() {
-      return byStatus(Tasks.TERMINAL_STATES);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/base/ScheduleException.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/base/ScheduleException.java b/src/main/java/com/twitter/aurora/scheduler/base/ScheduleException.java
deleted file mode 100644
index 0420ee9..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/base/ScheduleException.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.base;
-
-/**
- * Exception class to signal a failure to schedule a task or job.
- */
-public class ScheduleException extends Exception {
-  public ScheduleException(String msg) {
-    super(msg);
-  }
-
-  public ScheduleException(String msg, Throwable t) {
-    super(msg, t);
-  }
-
-  public ScheduleException(Throwable t) {
-    super(t);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/base/SchedulerException.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/base/SchedulerException.java b/src/main/java/com/twitter/aurora/scheduler/base/SchedulerException.java
deleted file mode 100644
index a51c4e0..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/base/SchedulerException.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.base;
-
-/**
- * Indicates some form of unexpected scheduler exception.
- */
-public class SchedulerException extends RuntimeException {
-  public SchedulerException(String message) {
-    super(message);
-  }
-  public SchedulerException(String message, Throwable cause) {
-    super(message, cause);
-  }
-  public SchedulerException(Throwable cause) {
-    super(cause);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/base/Tasks.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/base/Tasks.java b/src/main/java/com/twitter/aurora/scheduler/base/Tasks.java
deleted file mode 100644
index d98da3f..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/base/Tasks.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.base;
-
-import java.util.EnumSet;
-import java.util.Map;
-import java.util.Set;
-
-import com.google.common.base.Function;
-import com.google.common.base.Functions;
-import com.google.common.base.Predicate;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
-import com.google.common.collect.Ordering;
-
-import com.twitter.aurora.gen.ScheduleStatus;
-import com.twitter.aurora.gen.ScheduledTask;
-import com.twitter.aurora.gen.apiConstants;
-import com.twitter.aurora.scheduler.storage.entities.IAssignedTask;
-import com.twitter.aurora.scheduler.storage.entities.IJobKey;
-import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
-import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
-
-/**
- * Utility class providing convenience functions relating to tasks.
- */
-public final class Tasks {
-
-  public static final Function<IScheduledTask, IAssignedTask> SCHEDULED_TO_ASSIGNED =
-      new Function<IScheduledTask, IAssignedTask>() {
-        @Override public IAssignedTask apply(IScheduledTask task) {
-          return task.getAssignedTask();
-        }
-      };
-
-  public static final Function<IAssignedTask, ITaskConfig> ASSIGNED_TO_INFO =
-      new Function<IAssignedTask, ITaskConfig>() {
-        @Override public ITaskConfig apply(IAssignedTask task) {
-          return task.getTask();
-        }
-      };
-
-  public static final Function<IScheduledTask, ITaskConfig> SCHEDULED_TO_INFO =
-      Functions.compose(ASSIGNED_TO_INFO, SCHEDULED_TO_ASSIGNED);
-
-  public static final Function<IAssignedTask, String> ASSIGNED_TO_ID =
-      new Function<IAssignedTask, String>() {
-        @Override public String apply(IAssignedTask task) {
-          return task.getTaskId();
-        }
-      };
-
-  public static final Function<IScheduledTask, String> SCHEDULED_TO_ID =
-      Functions.compose(ASSIGNED_TO_ID, SCHEDULED_TO_ASSIGNED);
-
-  public static final Function<IAssignedTask, Integer> ASSIGNED_TO_INSTANCE_ID =
-      new Function<IAssignedTask, Integer>() {
-        @Override public Integer apply(IAssignedTask task) {
-          return task.getInstanceId();
-        }
-      };
-
-  public static final Function<IScheduledTask, Integer> SCHEDULED_TO_INSTANCE_ID =
-      Functions.compose(ASSIGNED_TO_INSTANCE_ID, SCHEDULED_TO_ASSIGNED);
-
-  public static final Function<ITaskConfig, IJobKey> INFO_TO_JOB_KEY =
-      new Function<ITaskConfig, IJobKey>() {
-        @Override public IJobKey apply(ITaskConfig task) {
-          return JobKeys.from(task);
-        }
-      };
-
-  public static final Function<IAssignedTask, IJobKey> ASSIGNED_TO_JOB_KEY =
-      Functions.compose(INFO_TO_JOB_KEY, ASSIGNED_TO_INFO);
-
-  public static final Function<IScheduledTask, IJobKey> SCHEDULED_TO_JOB_KEY =
-      Functions.compose(ASSIGNED_TO_JOB_KEY, SCHEDULED_TO_ASSIGNED);
-
-  /**
-   * Different states that an active task may be in.
-   */
-  public static final EnumSet<ScheduleStatus> ACTIVE_STATES =
-      EnumSet.copyOf(apiConstants.ACTIVE_STATES);
-
-  /**
-   * Terminal states, which a task should not move from.
-   */
-  public static final Set<ScheduleStatus> TERMINAL_STATES =
-      EnumSet.copyOf(apiConstants.TERMINAL_STATES);
-
-  public static final Predicate<ITaskConfig> IS_PRODUCTION =
-      new Predicate<ITaskConfig>() {
-        @Override public boolean apply(ITaskConfig task) {
-          return task.isProduction();
-        }
-      };
-
-  public static final Function<IScheduledTask, ScheduleStatus> GET_STATUS =
-      new Function<IScheduledTask, ScheduleStatus>() {
-        @Override public ScheduleStatus apply(IScheduledTask task) {
-          return task.getStatus();
-        }
-      };
-
-  /**
-   * Order by production flag (true, then false), subsorting by task ID.
-   */
-  public static final Ordering<IAssignedTask> SCHEDULING_ORDER =
-      Ordering.explicit(true, false)
-          .onResultOf(Functions.compose(Functions.forPredicate(IS_PRODUCTION), ASSIGNED_TO_INFO))
-          .compound(Ordering.natural().onResultOf(ASSIGNED_TO_ID));
-
-  private Tasks() {
-    // Utility class.
-  }
-
-  /**
-   * A utility method that returns a multi-map of tasks keyed by IJobKey.
-   * @param tasks A list of tasks to be keyed by map
-   * @return A multi-map of tasks keyed by job key.
-   */
-  public static Multimap<IJobKey, IScheduledTask> byJobKey(Iterable<IScheduledTask> tasks) {
-    return Multimaps.index(tasks, Tasks.SCHEDULED_TO_JOB_KEY);
-  }
-
-  public static boolean isActive(ScheduleStatus status) {
-    return ACTIVE_STATES.contains(status);
-  }
-
-  public static boolean isTerminated(ScheduleStatus status) {
-    return TERMINAL_STATES.contains(status);
-  }
-
-  public static String id(IScheduledTask task) {
-    return task.getAssignedTask().getTaskId();
-  }
-
-  // TODO(William Farner: Remove this once the code base is switched to IScheduledTask.
-  public static String id(ScheduledTask task) {
-    return task.getAssignedTask().getTaskId();
-  }
-
-  public static Set<String> ids(Iterable<IScheduledTask> tasks) {
-    return ImmutableSet.copyOf(Iterables.transform(tasks, SCHEDULED_TO_ID));
-  }
-
-  public static Set<String> ids(IScheduledTask... tasks) {
-    return ids(ImmutableList.copyOf(tasks));
-  }
-
-  public static Map<String, IScheduledTask> mapById(Iterable<IScheduledTask> tasks) {
-    return Maps.uniqueIndex(tasks, SCHEDULED_TO_ID);
-  }
-
-  public static String getRole(IScheduledTask task) {
-    return task.getAssignedTask().getTask().getOwner().getRole();
-  }
-
-  public static String getJob(IScheduledTask task) {
-    return task.getAssignedTask().getTask().getJobName();
-  }
-
-  public static final Ordering<IScheduledTask> LATEST_ACTIVITY = Ordering.natural()
-      .onResultOf(new Function<IScheduledTask, Long>() {
-        @Override public Long apply(IScheduledTask task) {
-          return Iterables.getLast(task.getTaskEvents()).getTimestamp();
-        }
-      });
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/configuration/ConfigurationManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/configuration/ConfigurationManager.java b/src/main/java/com/twitter/aurora/scheduler/configuration/ConfigurationManager.java
deleted file mode 100644
index 4839d0f..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/configuration/ConfigurationManager.java
+++ /dev/null
@@ -1,413 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.configuration;
-
-import java.util.regex.Pattern;
-
-import javax.annotation.Nullable;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-import org.apache.commons.lang.StringUtils;
-
-import com.twitter.aurora.gen.Constraint;
-import com.twitter.aurora.gen.JobConfiguration;
-import com.twitter.aurora.gen.LimitConstraint;
-import com.twitter.aurora.gen.TaskConfig;
-import com.twitter.aurora.gen.TaskConfig._Fields;
-import com.twitter.aurora.gen.TaskConstraint;
-import com.twitter.aurora.scheduler.base.JobKeys;
-import com.twitter.aurora.scheduler.storage.entities.IConstraint;
-import com.twitter.aurora.scheduler.storage.entities.IIdentity;
-import com.twitter.aurora.scheduler.storage.entities.IJobConfiguration;
-import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
-import com.twitter.aurora.scheduler.storage.entities.ITaskConstraint;
-import com.twitter.aurora.scheduler.storage.entities.IValueConstraint;
-import com.twitter.common.base.Closure;
-import com.twitter.common.base.MorePreconditions;
-
-import static com.twitter.aurora.gen.apiConstants.DEFAULT_ENVIRONMENT;
-import static com.twitter.aurora.gen.apiConstants.GOOD_IDENTIFIER_PATTERN_JVM;
-
-/**
- * Manages translation from a string-mapped configuration to a concrete configuration type, and
- * defaults for optional values.
- *
- * TODO(William Farner): Add input validation to all fields (strings not empty, positive ints, etc).
- */
-public final class ConfigurationManager {
-
-  public static final String DEDICATED_ATTRIBUTE = "dedicated";
-
-  @VisibleForTesting public static final String HOST_CONSTRAINT = "host";
-  @VisibleForTesting public static final String RACK_CONSTRAINT = "rack";
-
-  private static final Pattern GOOD_IDENTIFIER = Pattern.compile(GOOD_IDENTIFIER_PATTERN_JVM);
-
-  private static final int MAX_IDENTIFIER_LENGTH = 255;
-
-  private static class DefaultField implements Closure<TaskConfig> {
-    private final _Fields field;
-    private final Object defaultValue;
-
-    DefaultField(_Fields field, Object defaultValue) {
-      this.field = field;
-      this.defaultValue = defaultValue;
-    }
-
-    @Override public void execute(TaskConfig task) {
-      if (!task.isSet(field)) {
-        task.setFieldValue(field, defaultValue);
-      }
-    }
-  }
-
-  private interface Validator<T> {
-    void validate(T value) throws TaskDescriptionException;
-  }
-
-  private static class GreaterThan implements Validator<Number> {
-    private final double min;
-    private final String label;
-
-    GreaterThan(double min, String label) {
-      this.min = min;
-      this.label = label;
-    }
-
-    @Override public void validate(Number value) throws TaskDescriptionException {
-      if (this.min >= value.doubleValue()) {
-        throw new TaskDescriptionException(label + " must be greater than " + this.min);
-      }
-    }
-  }
-
-  private static class RequiredFieldValidator<T> implements Validator<TaskConfig> {
-    private final _Fields field;
-    private final Validator<T> validator;
-
-    RequiredFieldValidator(_Fields field, Validator<T> validator) {
-      this.field = field;
-      this.validator = validator;
-    }
-
-    public void validate(TaskConfig task) throws TaskDescriptionException {
-      if (!task.isSet(field)) {
-        throw new TaskDescriptionException("Field " + field.getFieldName() + " is required.");
-      }
-      @SuppressWarnings("unchecked")
-      T value = (T) task.getFieldValue(field);
-      validator.validate(value);
-    }
-  }
-
-  private static final Iterable<Closure<TaskConfig>> DEFAULT_FIELD_POPULATORS =
-      ImmutableList.of(
-          new DefaultField(_Fields.IS_SERVICE, false),
-          new DefaultField(_Fields.PRIORITY, 0),
-          new DefaultField(_Fields.PRODUCTION, false),
-          new DefaultField(_Fields.MAX_TASK_FAILURES, 1),
-          new DefaultField(_Fields.TASK_LINKS, Maps.<String, String>newHashMap()),
-          new DefaultField(_Fields.REQUESTED_PORTS, Sets.<String>newHashSet()),
-          new DefaultField(_Fields.CONSTRAINTS, Sets.<Constraint>newHashSet()),
-          new DefaultField(_Fields.ENVIRONMENT, DEFAULT_ENVIRONMENT),
-          new Closure<TaskConfig>() {
-            @Override public void execute(TaskConfig task) {
-              if (!Iterables.any(task.getConstraints(), hasName(HOST_CONSTRAINT))) {
-                task.addToConstraints(hostLimitConstraint(1));
-              }
-            }
-          },
-          new Closure<TaskConfig>() {
-            @Override public void execute(TaskConfig task) {
-              if (!isDedicated(ITaskConfig.build(task))
-                  && task.isProduction()
-                  && task.isIsService()
-                  && !Iterables.any(task.getConstraints(), hasName(RACK_CONSTRAINT))) {
-
-                task.addToConstraints(rackLimitConstraint(1));
-              }
-            }
-          });
-
-  private static final Iterable<RequiredFieldValidator<?>> REQUIRED_FIELDS_VALIDATORS =
-      ImmutableList.<RequiredFieldValidator<?>>of(
-          new RequiredFieldValidator<>(_Fields.NUM_CPUS, new GreaterThan(0.0, "num_cpus")),
-          new RequiredFieldValidator<>(_Fields.RAM_MB, new GreaterThan(0.0, "ram_mb")),
-          new RequiredFieldValidator<>(_Fields.DISK_MB, new GreaterThan(0.0, "disk_mb")));
-
-  private ConfigurationManager() {
-    // Utility class.
-  }
-
-  @VisibleForTesting
-  static boolean isGoodIdentifier(String identifier) {
-    return GOOD_IDENTIFIER.matcher(identifier).matches()
-        && (identifier.length() <= MAX_IDENTIFIER_LENGTH);
-  }
-
-  private static void checkNotNull(Object value, String error) throws TaskDescriptionException {
-    if (value == null) {
-      throw new TaskDescriptionException(error);
-    }
-  }
-
-  private static void assertOwnerValidity(IIdentity jobOwner) throws TaskDescriptionException {
-    checkNotNull(jobOwner, "No job owner specified!");
-    checkNotNull(jobOwner.getRole(), "No job role specified!");
-    checkNotNull(jobOwner.getUser(), "No job user specified!");
-
-    if (!isGoodIdentifier(jobOwner.getRole())) {
-      throw new TaskDescriptionException(
-          "Job role contains illegal characters: " + jobOwner.getRole());
-    }
-
-    if (!isGoodIdentifier(jobOwner.getUser())) {
-      throw new TaskDescriptionException(
-          "Job user contains illegal characters: " + jobOwner.getUser());
-    }
-  }
-
-  private static String getRole(IValueConstraint constraint) {
-    return Iterables.getOnlyElement(constraint.getValues()).split("/")[0];
-  }
-
-  private static boolean isValueConstraint(ITaskConstraint taskConstraint) {
-    return taskConstraint.getSetField() == TaskConstraint._Fields.VALUE;
-  }
-
-  public static boolean isDedicated(ITaskConfig task) {
-    return Iterables.any(task.getConstraints(), getConstraintByName(DEDICATED_ATTRIBUTE));
-  }
-
-  @Nullable
-  private static IConstraint getDedicatedConstraint(ITaskConfig task) {
-    return Iterables.find(task.getConstraints(), getConstraintByName(DEDICATED_ATTRIBUTE), null);
-  }
-
-  /**
-   * Check validity of and populates defaults in a job configuration.  This will return a deep copy
-   * of the provided job configuration with default configuration values applied, and configuration
-   * map values sanitized and applied to their respective struct fields.
-   *
-   * @param job Job to validate and populate.
-   * @return A deep copy of {@code job} that has been populated.
-   * @throws TaskDescriptionException If the job configuration is invalid.
-   */
-  public static IJobConfiguration validateAndPopulate(IJobConfiguration job)
-      throws TaskDescriptionException {
-
-    Preconditions.checkNotNull(job);
-
-    if (!job.isSetTaskConfig()) {
-      throw new TaskDescriptionException("Job configuration must have taskConfig set.");
-    }
-
-    if (!job.isSetInstanceCount()) {
-      throw new TaskDescriptionException("Job configuration does not have shardCount set.");
-    }
-
-    if (job.getInstanceCount() <= 0) {
-      throw new TaskDescriptionException("Shard count must be positive.");
-    }
-
-    JobConfiguration builder = job.newBuilder();
-
-    assertOwnerValidity(job.getOwner());
-
-    if (!JobKeys.isValid(job.getKey())) {
-      throw new TaskDescriptionException("Job key " + job.getKey() + " is invalid.");
-    }
-    if (!job.getKey().getRole().equals(job.getOwner().getRole())) {
-      throw new TaskDescriptionException("Role in job key must match job owner.");
-    }
-    if (!isGoodIdentifier(job.getKey().getRole())) {
-      throw new TaskDescriptionException(
-          "Job role contains illegal characters: " + job.getKey().getRole());
-    }
-    if (!isGoodIdentifier(job.getKey().getEnvironment())) {
-      throw new TaskDescriptionException(
-          "Job environment contains illegal characters: " + job.getKey().getEnvironment());
-    }
-    if (!isGoodIdentifier(job.getKey().getName())) {
-      throw new TaskDescriptionException(
-          "Job name contains illegal characters: " + job.getKey().getName());
-    }
-
-    builder.setTaskConfig(
-        validateAndPopulate(ITaskConfig.build(builder.getTaskConfig())).newBuilder());
-
-    // Only one of [service=true, cron_schedule] may be set.
-    if (!StringUtils.isEmpty(job.getCronSchedule()) && builder.getTaskConfig().isIsService()) {
-      throw new TaskDescriptionException(
-          "A service task may not be run on a cron schedule: " + builder);
-    }
-
-    return IJobConfiguration.build(builder);
-  }
-
-  /**
-   * Check validity of and populates defaults in a task configuration.  This will return a deep copy
-   * of the provided task configuration with default configuration values applied, and configuration
-   * map values sanitized and applied to their respective struct fields.
-   *
-   *
-   * @param config Task config to validate and populate.
-   * @return A reference to the modified {@code config} (for chaining).
-   * @throws TaskDescriptionException If the task is invalid.
-   */
-  public static ITaskConfig validateAndPopulate(ITaskConfig config)
-      throws TaskDescriptionException {
-
-    TaskConfig builder = config.newBuilder();
-
-    if (!builder.isSetRequestedPorts()) {
-      builder.setRequestedPorts(ImmutableSet.<String>of());
-    }
-
-    maybeFillLinks(builder);
-
-    assertOwnerValidity(config.getOwner());
-
-    if (!isGoodIdentifier(config.getJobName())) {
-      throw new TaskDescriptionException(
-          "Job name contains illegal characters: " + config.getJobName());
-    }
-
-    if (!isGoodIdentifier(config.getEnvironment())) {
-      throw new TaskDescriptionException(
-          "Environment contains illegal characters: " + config.getEnvironment());
-    }
-
-    if (!builder.isSetExecutorConfig()) {
-      throw new TaskDescriptionException("Configuration may not be null");
-    }
-
-    // Maximize the usefulness of any thrown error message by checking required fields first.
-    for (RequiredFieldValidator<?> validator : REQUIRED_FIELDS_VALIDATORS) {
-      validator.validate(builder);
-    }
-
-    IConstraint constraint = getDedicatedConstraint(config);
-    if (constraint != null) {
-      if (!isValueConstraint(constraint.getConstraint())) {
-        throw new TaskDescriptionException("A dedicated constraint must be of value type.");
-      }
-
-      IValueConstraint valueConstraint = constraint.getConstraint().getValue();
-
-      if (!(valueConstraint.getValues().size() == 1)) {
-        throw new TaskDescriptionException("A dedicated constraint must have exactly one value");
-      }
-
-      String dedicatedRole = getRole(valueConstraint);
-      if (!config.getOwner().getRole().equals(dedicatedRole)) {
-        throw new TaskDescriptionException(
-            "Only " + dedicatedRole + " may use hosts dedicated for that role.");
-      }
-    }
-
-    return ITaskConfig.build(applyDefaultsIfUnset(builder));
-  }
-
-  /**
-   * Provides a filter for the given constraint name.
-   *
-   * @param name The name of the constraint.
-   * @return A filter that matches the constraint.
-   */
-  public static Predicate<IConstraint> getConstraintByName(final String name) {
-    return new Predicate<IConstraint>() {
-      @Override public boolean apply(IConstraint constraint) {
-        return constraint.getName().equals(name);
-      }
-    };
-  }
-
-  @VisibleForTesting
-  public static Constraint hostLimitConstraint(int limit) {
-    return new Constraint(HOST_CONSTRAINT, TaskConstraint.limit(new LimitConstraint(limit)));
-  }
-
-  @VisibleForTesting
-  public static Constraint rackLimitConstraint(int limit) {
-    return new Constraint(RACK_CONSTRAINT, TaskConstraint.limit(new LimitConstraint(limit)));
-  }
-
-  private static Predicate<Constraint> hasName(final String name) {
-    MorePreconditions.checkNotBlank(name);
-    return new Predicate<Constraint>() {
-      @Override public boolean apply(Constraint constraint) {
-        return name.equals(constraint.getName());
-      }
-    };
-  }
-
-  /**
-   * Applies defaults to unset values in a task.
-   *
-   * @param task Task to apply defaults to.
-   * @return A reference to the (modified) {@code task}.
-   */
-  @VisibleForTesting
-  public static TaskConfig applyDefaultsIfUnset(TaskConfig task) {
-    for (Closure<TaskConfig> populator : DEFAULT_FIELD_POPULATORS) {
-      populator.execute(task);
-    }
-
-    return task;
-  }
-
-  /**
-   * Applies defaults to unset values in a job and its tasks.
-   *
-   * @param job Job to apply defaults to.
-   */
-  @VisibleForTesting
-  public static void applyDefaultsIfUnset(JobConfiguration job) {
-    ConfigurationManager.applyDefaultsIfUnset(job.getTaskConfig());
-  }
-
-  private static void maybeFillLinks(TaskConfig task) {
-    if (task.getTaskLinksSize() == 0) {
-      ImmutableMap.Builder<String, String> links = ImmutableMap.builder();
-      if (task.getRequestedPorts().contains("health")) {
-        links.put("health", "http://%host%:%port:health%");
-      }
-      if (task.getRequestedPorts().contains("http")) {
-        links.put("http", "http://%host%:%port:http%");
-      }
-      task.setTaskLinks(links.build());
-    }
-  }
-
-  /**
-   * Thrown when an invalid task or job configuration is encountered.
-   */
-  public static class TaskDescriptionException extends Exception {
-    public TaskDescriptionException(String msg) {
-      super(msg);
-    }
-  }
-}