You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2013/12/31 22:20:42 UTC

[49/51] [partial] Rename twitter* and com.twitter to apache and org.apache directories to preserve all file history before the refactor.

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/app/Modules.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/app/Modules.java b/src/main/java/com/twitter/aurora/scheduler/app/Modules.java
deleted file mode 100644
index 72a80e1..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/app/Modules.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.app;
-
-import com.google.inject.Module;
-import com.google.inject.PrivateModule;
-
-/**
- * A utility class for managing guice modules.
- */
-final class Modules {
-
-  private Modules() {
-    // Utility class
-  }
-
-  private static Module instantiateModule(final Class<? extends Module> moduleClass) {
-    try {
-      return moduleClass.newInstance();
-    } catch (InstantiationException e) {
-      throw new IllegalArgumentException(
-          String.format(
-              "Failed to instantiate module %s. Are you sure it has a no-arg constructor?",
-              moduleClass.getName()),
-          e);
-    } catch (IllegalAccessException e) {
-      throw new IllegalArgumentException(
-          String.format(
-              "Failed to instantiate module %s. Are you sure it's public?",
-              moduleClass.getName()),
-          e);
-    }
-  }
-
-  // Defensively wrap each module provided on the command-line in a PrivateModule that only
-  // exposes requested classes to ensure that we don't depend on surprise extra bindings across
-  // different implementations.
-  static Module wrapInPrivateModule(
-      Class<? extends Module> moduleClass,
-      final Iterable<Class<?>> exposedClasses) {
-
-    final Module module = instantiateModule(moduleClass);
-    return new PrivateModule() {
-      @Override protected void configure() {
-        install(module);
-        for (Class<?> klass : exposedClasses) {
-          expose(klass);
-        }
-      }
-    };
-  }
-
-  static Module getModule(Class<? extends Module> moduleClass) {
-    return instantiateModule(moduleClass);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/app/SchedulerMain.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/app/SchedulerMain.java b/src/main/java/com/twitter/aurora/scheduler/app/SchedulerMain.java
deleted file mode 100644
index 693c364..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/app/SchedulerMain.java
+++ /dev/null
@@ -1,256 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.app;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.InputStream;
-import java.net.InetSocketAddress;
-import java.util.List;
-import java.util.logging.Logger;
-
-import javax.annotation.Nonnegative;
-import javax.inject.Inject;
-import javax.inject.Singleton;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableList.Builder;
-import com.google.inject.AbstractModule;
-import com.google.inject.Module;
-
-import com.twitter.aurora.auth.CapabilityValidator;
-import com.twitter.aurora.auth.SessionValidator;
-import com.twitter.aurora.auth.UnsecureAuthModule;
-import com.twitter.aurora.scheduler.DriverFactory;
-import com.twitter.aurora.scheduler.DriverFactory.DriverFactoryImpl;
-import com.twitter.aurora.scheduler.MesosTaskFactory.ExecutorConfig;
-import com.twitter.aurora.scheduler.SchedulerLifecycle;
-import com.twitter.aurora.scheduler.cron.CronPredictor;
-import com.twitter.aurora.scheduler.cron.CronScheduler;
-import com.twitter.aurora.scheduler.cron.noop.NoopCronModule;
-import com.twitter.aurora.scheduler.local.IsolatedSchedulerModule;
-import com.twitter.aurora.scheduler.log.mesos.MesosLogStreamModule;
-import com.twitter.aurora.scheduler.storage.backup.BackupModule;
-import com.twitter.aurora.scheduler.storage.log.LogStorage;
-import com.twitter.aurora.scheduler.storage.log.LogStorageModule;
-import com.twitter.aurora.scheduler.storage.log.SnapshotStoreImpl;
-import com.twitter.aurora.scheduler.storage.mem.MemStorageModule;
-import com.twitter.aurora.scheduler.thrift.ThriftConfiguration;
-import com.twitter.aurora.scheduler.thrift.ThriftModule;
-import com.twitter.aurora.scheduler.thrift.auth.ThriftAuthModule;
-import com.twitter.common.application.AbstractApplication;
-import com.twitter.common.application.AppLauncher;
-import com.twitter.common.application.Lifecycle;
-import com.twitter.common.application.modules.HttpModule;
-import com.twitter.common.application.modules.LocalServiceRegistry;
-import com.twitter.common.application.modules.LogModule;
-import com.twitter.common.application.modules.StatsModule;
-import com.twitter.common.args.Arg;
-import com.twitter.common.args.CmdLine;
-import com.twitter.common.args.constraints.NotEmpty;
-import com.twitter.common.args.constraints.NotNull;
-import com.twitter.common.inject.Bindings;
-import com.twitter.common.logging.RootLogConfig;
-import com.twitter.common.zookeeper.Group;
-import com.twitter.common.zookeeper.SingletonService;
-import com.twitter.common.zookeeper.SingletonService.LeadershipListener;
-import com.twitter.common.zookeeper.guice.client.ZooKeeperClientModule;
-import com.twitter.common.zookeeper.guice.client.ZooKeeperClientModule.ClientConfig;
-import com.twitter.common.zookeeper.guice.client.flagged.FlaggedClientConfig;
-
-/**
- * Launcher for the aurora scheduler.
- */
-public class SchedulerMain extends AbstractApplication {
-
-  private static final Logger LOG = Logger.getLogger(SchedulerMain.class.getName());
-
-  @CmdLine(name = "testing_isolated_scheduler",
-      help = "If true, run in a testing mode with the scheduler isolated from other components.")
-  private static final Arg<Boolean> ISOLATED_SCHEDULER = Arg.create(false);
-
-  @NotNull
-  @CmdLine(name = "cluster_name", help = "Name to identify the cluster being served.")
-  private static final Arg<String> CLUSTER_NAME = Arg.create();
-
-  @NotNull
-  @NotEmpty
-  @CmdLine(name = "serverset_path", help = "ZooKeeper ServerSet path to register at.")
-  private static final Arg<String> SERVERSET_PATH = Arg.create();
-
-  @CmdLine(name = "mesos_ssl_keyfile",
-      help = "JKS keyfile for operating the Mesos Thrift-over-SSL interface.")
-  private static final Arg<File> MESOS_SSL_KEY_FILE = Arg.create();
-
-  @Nonnegative
-  @CmdLine(name = "thrift_port", help = "Thrift server port.")
-  private static final Arg<Integer> THRIFT_PORT = Arg.create(0);
-
-  @NotNull
-  @CmdLine(name = "thermos_executor_path", help = "Path to the thermos executor launch script.")
-  private static final Arg<String> THERMOS_EXECUTOR_PATH = Arg.create();
-
-  @CmdLine(name = "auth_module",
-      help = "A Guice module to provide auth bindings. NOTE: The default is unsecure.")
-  private static final Arg<? extends Class<? extends Module>> AUTH_MODULE =
-      Arg.create(UnsecureAuthModule.class);
-
-  private static final Iterable<Class<?>> AUTH_MODULE_CLASSES = ImmutableList.<Class<?>>builder()
-      .add(SessionValidator.class)
-      .add(CapabilityValidator.class)
-      .build();
-
-  @CmdLine(name = "cron_module",
-      help = "A Guice module to provide cron bindings. NOTE: The default is a no-op.")
-  private static final Arg<? extends Class<? extends Module>> CRON_MODULE =
-      Arg.create(NoopCronModule.class);
-
-  private static final Iterable<Class<?>> CRON_MODULE_CLASSES = ImmutableList.<Class<?>>builder()
-      .add(CronPredictor.class)
-      .add(CronScheduler.class)
-      .build();
-
-  // TODO(Suman Karumuri): Pass in AUTH and CRON modules as extra modules
-  @CmdLine(name = "extra_modules",
-      help = "A list of modules that provide additional functionality.")
-  private static final Arg<List<Class<? extends Module>>> EXTRA_MODULES =
-      Arg.create((List<Class<? extends Module>>) ImmutableList.<Class<? extends Module>>of());
-
-  @Inject private SingletonService schedulerService;
-  @Inject private LocalServiceRegistry serviceRegistry;
-  @Inject private SchedulerLifecycle schedulerLifecycle;
-  @Inject private Lifecycle appLifecycle;
-  @Inject private Optional<RootLogConfig.Configuration> glogConfig;
-
-  private static Iterable<? extends Module> getSystemModules() {
-    return ImmutableList.of(
-        new LogModule(),
-        new HttpModule(),
-        new StatsModule()
-    );
-  }
-
-  private static Iterable<? extends Module> getExtraModules() {
-    Builder<Module> modules = ImmutableList.builder();
-    modules.add(Modules.wrapInPrivateModule(AUTH_MODULE.get(), AUTH_MODULE_CLASSES))
-        .add(Modules.wrapInPrivateModule(CRON_MODULE.get(), CRON_MODULE_CLASSES));
-
-    for (Class<? extends Module> moduleClass : EXTRA_MODULES.get()) {
-      modules.add(Modules.getModule(moduleClass));
-    }
-
-    return modules.build();
-  }
-
-  static Iterable<? extends Module> getModules(
-      String clusterName,
-      String serverSetPath,
-      ClientConfig zkClientConfig) {
-
-    return ImmutableList.<Module>builder()
-        .addAll(getSystemModules())
-        .add(new AppModule(clusterName, serverSetPath, zkClientConfig))
-        .addAll(getExtraModules())
-        .add(new LogStorageModule())
-        .add(new MemStorageModule(Bindings.annotatedKeyFactory(LogStorage.WriteBehind.class)))
-        .add(new ThriftModule())
-        .add(new ThriftAuthModule())
-        .build();
-  }
-
-  @Override
-  public Iterable<? extends Module> getModules() {
-    Module additional;
-    final ClientConfig zkClientConfig = FlaggedClientConfig.create();
-    if (ISOLATED_SCHEDULER.get()) {
-      additional = new IsolatedSchedulerModule();
-    } else {
-      // TODO(Kevin Sweeney): Push these bindings down into a "production" module.
-      additional = new AbstractModule() {
-        @Override protected void configure() {
-          bind(DriverFactory.class).to(DriverFactoryImpl.class);
-          bind(DriverFactoryImpl.class).in(Singleton.class);
-          install(new MesosLogStreamModule(zkClientConfig));
-        }
-      };
-    }
-
-    Module configModule = new AbstractModule() {
-      @Override protected void configure() {
-        bind(ThriftConfiguration.class).toInstance(new ThriftConfiguration() {
-          @Override public Optional<InputStream> getSslKeyStream() throws FileNotFoundException {
-            if (MESOS_SSL_KEY_FILE.hasAppliedValue()) {
-              return Optional.<InputStream>of(new FileInputStream(MESOS_SSL_KEY_FILE.get()));
-            } else {
-              return Optional.absent();
-            }
-          }
-
-          @Override public int getServingPort() {
-            return THRIFT_PORT.get();
-          }
-        });
-        bind(ExecutorConfig.class).toInstance(new ExecutorConfig(THERMOS_EXECUTOR_PATH.get()));
-      }
-    };
-
-    return ImmutableList.<Module>builder()
-        .add(new BackupModule(SnapshotStoreImpl.class))
-        .addAll(getModules(CLUSTER_NAME.get(), SERVERSET_PATH.get(), zkClientConfig))
-        .add(new ZooKeeperClientModule(zkClientConfig))
-        .add(configModule)
-        .add(additional)
-        .build();
-  }
-
-  @Override
-  public void run() {
-    if (glogConfig.isPresent()) {
-      // Setup log4j to match our jul glog config in order to pick up zookeeper logging.
-      Log4jConfigurator.configureConsole(glogConfig.get());
-    } else {
-      LOG.warning("Running without expected glog configuration.");
-    }
-
-    LeadershipListener leaderListener = schedulerLifecycle.prepare();
-
-    Optional<InetSocketAddress> primarySocket = serviceRegistry.getPrimarySocket();
-    if (!primarySocket.isPresent()) {
-      throw new IllegalStateException("No primary service registered with LocalServiceRegistry.");
-    }
-
-    try {
-      schedulerService.lead(
-          primarySocket.get(),
-          serviceRegistry.getAuxiliarySockets(),
-          leaderListener);
-    } catch (Group.WatchException e) {
-      throw new IllegalStateException("Failed to watch group and lead service.", e);
-    } catch (Group.JoinException e) {
-      throw new IllegalStateException("Failed to join scheduler service group.", e);
-    } catch (InterruptedException e) {
-      throw new IllegalStateException("Interrupted while joining scheduler service group.", e);
-    }
-
-    appLifecycle.awaitShutdown();
-  }
-
-  public static void main(String[] args) {
-    AppLauncher.launch(SchedulerMain.class, args);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/async/AsyncModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/async/AsyncModule.java b/src/main/java/com/twitter/aurora/scheduler/async/AsyncModule.java
deleted file mode 100644
index faf3269..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/async/AsyncModule.java
+++ /dev/null
@@ -1,274 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.async;
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.logging.Logger;
-
-import javax.inject.Singleton;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.RateLimiter;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.inject.AbstractModule;
-import com.google.inject.Binder;
-import com.google.inject.BindingAnnotation;
-import com.google.inject.Key;
-import com.google.inject.PrivateModule;
-import com.google.inject.TypeLiteral;
-
-import com.twitter.aurora.scheduler.async.OfferQueue.OfferQueueImpl;
-import com.twitter.aurora.scheduler.async.OfferQueue.OfferReturnDelay;
-import com.twitter.aurora.scheduler.async.RescheduleCalculator.RescheduleCalculatorImpl;
-import com.twitter.aurora.scheduler.async.TaskGroups.TaskGroupsSettings;
-import com.twitter.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl;
-import com.twitter.aurora.scheduler.events.PubsubEventModule;
-import com.twitter.common.args.Arg;
-import com.twitter.common.args.CmdLine;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.stats.StatImpl;
-import com.twitter.common.stats.Stats;
-import com.twitter.common.stats.StatsProvider;
-import com.twitter.common.util.Random;
-import com.twitter.common.util.TruncatedBinaryBackoff;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-import static com.twitter.aurora.scheduler.async.HistoryPruner.PruneThreshold;
-import static com.twitter.aurora.scheduler.async.Preemptor.PreemptorImpl;
-import static com.twitter.aurora.scheduler.async.Preemptor.PreemptorImpl.PreemptionDelay;
-import static com.twitter.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl.ReservationDuration;
-
-/**
- * Binding module for async task management.
- */
-public class AsyncModule extends AbstractModule {
-
-  private static final Logger LOG = Logger.getLogger(AsyncModule.class.getName());
-
-  @CmdLine(name = "async_worker_threads",
-      help = "The number of worker threads to process async task operations with.")
-  private static final Arg<Integer> ASYNC_WORKER_THREADS = Arg.create(1);
-
-  @CmdLine(name = "transient_task_state_timeout",
-      help = "The amount of time after which to treat a task stuck in a transient state as LOST.")
-  private static final Arg<Amount<Long, Time>> TRANSIENT_TASK_STATE_TIMEOUT =
-      Arg.create(Amount.of(5L, Time.MINUTES));
-
-  @CmdLine(name = "initial_schedule_delay",
-      help = "Initial amount of time to wait before attempting to schedule a PENDING task.")
-  private static final Arg<Amount<Long, Time>> INITIAL_SCHEDULE_DELAY =
-      Arg.create(Amount.of(1L, Time.SECONDS));
-
-  @CmdLine(name = "max_schedule_delay",
-      help = "Maximum delay between attempts to schedule a PENDING tasks.")
-  private static final Arg<Amount<Long, Time>> MAX_SCHEDULE_DELAY =
-      Arg.create(Amount.of(30L, Time.SECONDS));
-
-  @CmdLine(name = "min_offer_hold_time",
-      help = "Minimum amount of time to hold a resource offer before declining.")
-  private static final Arg<Amount<Integer, Time>> MIN_OFFER_HOLD_TIME =
-      Arg.create(Amount.of(5, Time.MINUTES));
-
-  @CmdLine(name = "history_prune_threshold",
-      help = "Time after which the scheduler will prune terminated task history.")
-  private static final Arg<Amount<Long, Time>> HISTORY_PRUNE_THRESHOLD =
-      Arg.create(Amount.of(2L, Time.DAYS));
-
-  @CmdLine(name = "max_schedule_attempts_per_sec",
-      help = "Maximum number of scheduling attempts to make per second.")
-  private static final Arg<Double> MAX_SCHEDULE_ATTEMPTS_PER_SEC = Arg.create(10D);
-
-  @CmdLine(name = "flapping_task_threshold",
-      help = "A task that repeatedly runs for less than this time is considered to be flapping.")
-  private static final Arg<Amount<Long, Time>> FLAPPING_THRESHOLD =
-      Arg.create(Amount.of(5L, Time.MINUTES));
-
-  @CmdLine(name = "initial_flapping_task_delay",
-      help = "Initial amount of time to wait before attempting to schedule a flapping task.")
-  private static final Arg<Amount<Long, Time>> INITIAL_FLAPPING_DELAY =
-      Arg.create(Amount.of(30L, Time.SECONDS));
-
-  @CmdLine(name = "max_flapping_task_delay",
-      help = "Maximum delay between attempts to schedule a flapping task.")
-  private static final Arg<Amount<Long, Time>> MAX_FLAPPING_DELAY =
-      Arg.create(Amount.of(5L, Time.MINUTES));
-
-  @CmdLine(name = "max_reschedule_task_delay_on_startup",
-      help = "Upper bound of random delay for pending task rescheduling on scheduler startup.")
-  private static final Arg<Amount<Integer, Time>> MAX_RESCHEDULING_DELAY =
-      Arg.create(Amount.of(30, Time.SECONDS));
-
-  @CmdLine(name = "preemption_delay",
-      help = "Time interval after which a pending task becomes eligible to preempt other tasks")
-  private static final Arg<Amount<Long, Time>> PREEMPTION_DELAY =
-      Arg.create(Amount.of(10L, Time.MINUTES));
-
-  @CmdLine(name = "enable_preemptor",
-      help = "Enable the preemptor and preemption")
-  private static final Arg<Boolean> ENABLE_PREEMPTOR = Arg.create(true);
-
-  private static final Preemptor NULL_PREEMPTOR = new Preemptor() {
-    @Override public Optional<String> findPreemptionSlotFor(String taskId) {
-      return Optional.absent();
-    }
-  };
-
-  @CmdLine(name = "offer_reservation_duration", help = "Time to reserve a slave's offers while "
-      + "trying to satisfy a task preempting another.")
-  private static final Arg<Amount<Long, Time>> RESERVATION_DURATION =
-      Arg.create(Amount.of(3L, Time.MINUTES));
-
-  @BindingAnnotation
-  @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
-  private @interface PreemptionBinding { }
-
-  @VisibleForTesting
-  static final Key<Preemptor> PREEMPTOR_KEY = Key.get(Preemptor.class, PreemptionBinding.class);
-
-  @Override
-  protected void configure() {
-    // Don't worry about clean shutdown, these can be daemon and cleanup-free.
-    final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(
-        ASYNC_WORKER_THREADS.get(),
-        new ThreadFactoryBuilder().setNameFormat("AsyncProcessor-%d").setDaemon(true).build());
-    Stats.exportSize("timeout_queue_size", executor.getQueue());
-    Stats.export(new StatImpl<Long>("async_tasks_completed") {
-      @Override public Long read() {
-        return executor.getCompletedTaskCount();
-      }
-    });
-
-    // AsyncModule itself is not a subclass of PrivateModule because TaskEventModule internally uses
-    // a MultiBinder, which cannot span multiple injectors.
-    binder().install(new PrivateModule() {
-      @Override protected void configure() {
-        bind(new TypeLiteral<Amount<Long, Time>>() { })
-            .toInstance(TRANSIENT_TASK_STATE_TIMEOUT.get());
-        bind(ScheduledExecutorService.class).toInstance(executor);
-
-        bind(TaskTimeout.class).in(Singleton.class);
-        requireBinding(StatsProvider.class);
-        expose(TaskTimeout.class);
-      }
-    });
-    PubsubEventModule.bindSubscriber(binder(), TaskTimeout.class);
-
-    binder().install(new PrivateModule() {
-      @Override protected void configure() {
-        bind(TaskGroupsSettings.class).toInstance(new TaskGroupsSettings(
-            new TruncatedBinaryBackoff(INITIAL_SCHEDULE_DELAY.get(), MAX_SCHEDULE_DELAY.get()),
-            RateLimiter.create(MAX_SCHEDULE_ATTEMPTS_PER_SEC.get())));
-
-        bind(RescheduleCalculatorImpl.RescheduleCalculatorSettings.class)
-            .toInstance(new RescheduleCalculatorImpl.RescheduleCalculatorSettings(
-                new TruncatedBinaryBackoff(INITIAL_FLAPPING_DELAY.get(), MAX_FLAPPING_DELAY.get()),
-                FLAPPING_THRESHOLD.get(),
-                MAX_RESCHEDULING_DELAY.get()));
-
-        bind(RescheduleCalculator.class).to(RescheduleCalculatorImpl.class).in(Singleton.class);
-        if (ENABLE_PREEMPTOR.get()) {
-          bind(PREEMPTOR_KEY).to(PreemptorImpl.class);
-          bind(PreemptorImpl.class).in(Singleton.class);
-          LOG.info("Preemptor Enabled.");
-        } else {
-          bind(PREEMPTOR_KEY).toInstance(NULL_PREEMPTOR);
-          LOG.warning("Preemptor Disabled.");
-        }
-        expose(PREEMPTOR_KEY);
-        bind(new TypeLiteral<Amount<Long, Time>>() {
-        }).annotatedWith(PreemptionDelay.class)
-            .toInstance(PREEMPTION_DELAY.get());
-        bind(TaskGroups.class).in(Singleton.class);
-        expose(TaskGroups.class);
-      }
-    });
-    bindTaskScheduler(binder(), PREEMPTOR_KEY, RESERVATION_DURATION.get());
-    PubsubEventModule.bindSubscriber(binder(), TaskGroups.class);
-
-    binder().install(new PrivateModule() {
-      @Override protected void configure() {
-        bind(OfferReturnDelay.class).to(RandomJitterReturnDelay.class);
-        bind(ScheduledExecutorService.class).toInstance(executor);
-        bind(OfferQueue.class).to(OfferQueueImpl.class);
-        bind(OfferQueueImpl.class).in(Singleton.class);
-        expose(OfferQueue.class);
-      }
-    });
-    PubsubEventModule.bindSubscriber(binder(), OfferQueue.class);
-
-    binder().install(new PrivateModule() {
-      @Override protected void configure() {
-        // TODO(ksweeney): Create a configuration validator module so this can be injected.
-        // TODO(William Farner): Revert this once large task counts is cheap ala hierarchichal store
-        bind(Integer.class).annotatedWith(PruneThreshold.class).toInstance(100);
-        bind(new TypeLiteral<Amount<Long, Time>>() { }).annotatedWith(PruneThreshold.class)
-            .toInstance(HISTORY_PRUNE_THRESHOLD.get());
-        bind(ScheduledExecutorService.class).toInstance(executor);
-
-        bind(HistoryPruner.class).in(Singleton.class);
-        expose(HistoryPruner.class);
-      }
-    });
-    PubsubEventModule.bindSubscriber(binder(), HistoryPruner.class);
-  }
-
-  /**
-   * This method exists because we want to test the wiring up of TaskSchedulerImpl class to the
-   * PubSub system in the TaskSchedulerImplTest class. The method has a complex signature because
-   * the binding of the TaskScheduler and friends occurs in a PrivateModule which does not interact
-   * well with the MultiBinder that backs the PubSub system.
-   */
-  @VisibleForTesting
-  static void bindTaskScheduler(
-      Binder binder,
-      final Key<Preemptor> preemptorKey,
-      final Amount<Long, Time> reservationDuration) {
-        binder.install(new PrivateModule() {
-          @Override protected void configure() {
-            bind(Preemptor.class).to(preemptorKey);
-            bind(new TypeLiteral<Amount<Long, Time>>() { }).annotatedWith(ReservationDuration.class)
-                .toInstance(reservationDuration);
-            bind(TaskScheduler.class).to(TaskSchedulerImpl.class);
-            bind(TaskSchedulerImpl.class).in(Singleton.class);
-            expose(TaskScheduler.class);
-          }
-        });
-        PubsubEventModule.bindSubscriber(binder, TaskScheduler.class);
-  }
-
-  /**
-   * Returns offers after a random duration within a fixed window.
-   */
-  private static class RandomJitterReturnDelay implements OfferReturnDelay {
-    private static final int JITTER_WINDOW_MS = Amount.of(1, Time.MINUTES).as(Time.MILLISECONDS);
-
-    private final int minHoldTimeMs = MIN_OFFER_HOLD_TIME.get().as(Time.MILLISECONDS);
-    private final Random random = new Random.SystemRandom(new java.util.Random());
-
-    @Override public Amount<Integer, Time> get() {
-      return Amount.of(minHoldTimeMs + random.nextInt(JITTER_WINDOW_MS), Time.MILLISECONDS);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/async/HistoryPruner.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/async/HistoryPruner.java b/src/main/java/com/twitter/aurora/scheduler/async/HistoryPruner.java
deleted file mode 100644
index 9af6d36..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/async/HistoryPruner.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.async;
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.LinkedHashMultimap;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
-import com.google.common.eventbus.Subscribe;
-import com.google.inject.BindingAnnotation;
-
-import com.twitter.aurora.scheduler.base.Query;
-import com.twitter.aurora.scheduler.base.Tasks;
-import com.twitter.aurora.scheduler.state.StateManager;
-import com.twitter.aurora.scheduler.storage.Storage;
-import com.twitter.aurora.scheduler.storage.entities.IJobKey;
-import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.util.Clock;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import static com.twitter.aurora.scheduler.base.Tasks.LATEST_ACTIVITY;
-import static com.twitter.aurora.scheduler.events.PubsubEvent.EventSubscriber;
-import static com.twitter.aurora.scheduler.events.PubsubEvent.StorageStarted;
-import static com.twitter.aurora.scheduler.events.PubsubEvent.TaskStateChange;
-import static com.twitter.aurora.scheduler.events.PubsubEvent.TasksDeleted;
-
-/**
- * Prunes tasks in a job based on per-job history and an inactive time threshold by observing tasks
- * transitioning into one of the inactive states.
- */
-public class HistoryPruner implements EventSubscriber {
-  private static final Logger LOG = Logger.getLogger(HistoryPruner.class.getName());
-
-  @VisibleForTesting
-  static final Query.Builder INACTIVE_QUERY = Query.unscoped().terminal();
-
-  private final Multimap<IJobKey, String> tasksByJob =
-      Multimaps.synchronizedSetMultimap(LinkedHashMultimap.<IJobKey, String>create());
-  @VisibleForTesting
-  Multimap<IJobKey, String> getTasksByJob() {
-    return tasksByJob;
-  }
-
-  private final ScheduledExecutorService executor;
-  private final Storage storage;
-  private final StateManager stateManager;
-  private final Clock clock;
-  private final long pruneThresholdMillis;
-  private final int perJobHistoryGoal;
-  private final Map<String, Future<?>> taskIdToFuture = Maps.newConcurrentMap();
-
-  @BindingAnnotation
-  @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
-  public @interface PruneThreshold { }
-
-  @Inject
-  HistoryPruner(
-      final ScheduledExecutorService executor,
-      final Storage storage,
-      final StateManager stateManager,
-      final Clock clock,
-      @PruneThreshold Amount<Long, Time> inactivePruneThreshold,
-      @PruneThreshold int perJobHistoryGoal) {
-
-    this.executor = checkNotNull(executor);
-    this.storage = checkNotNull(storage);
-    this.stateManager = checkNotNull(stateManager);
-    this.clock = checkNotNull(clock);
-    this.pruneThresholdMillis = inactivePruneThreshold.as(Time.MILLISECONDS);
-    this.perJobHistoryGoal = perJobHistoryGoal;
-  }
-
-  @VisibleForTesting
-  long calculateTimeout(long taskEventTimestampMillis) {
-    return pruneThresholdMillis - Math.max(0, clock.nowMillis() - taskEventTimestampMillis);
-  }
-
-  /**
-   * When triggered, records an inactive task state change.
-   *
-   * @param change Event when a task changes state.
-   */
-  @Subscribe
-  public void recordStateChange(TaskStateChange change) {
-    if (Tasks.isTerminated(change.getNewState())) {
-      registerInactiveTask(
-          Tasks.SCHEDULED_TO_JOB_KEY.apply(change.getTask()),
-          change.getTaskId(),
-          calculateTimeout(clock.nowMillis()));
-    }
-  }
-
-  /**
-   * When triggered, iterates through inactive tasks in the system and prunes tasks that
-   * exceed the history goal for a job or are beyond the time threshold.
-   *
-   * @param event A new StorageStarted event.
-   */
-  @Subscribe
-  public void storageStarted(StorageStarted event) {
-    for (IScheduledTask task
-        : LATEST_ACTIVITY.sortedCopy(Storage.Util.consistentFetchTasks(storage, INACTIVE_QUERY))) {
-
-      registerInactiveTask(
-          Tasks.SCHEDULED_TO_JOB_KEY.apply(task),
-          Tasks.id(task),
-          calculateTimeout(Iterables.getLast(task.getTaskEvents()).getTimestamp()));
-    }
-  }
-
-  private void deleteTasks(Set<String> taskIds) {
-    LOG.info("Pruning inactive tasks " + taskIds);
-    stateManager.deleteTasks(taskIds);
-  }
-
-  /**
-   * When triggered, removes the tasks scheduled for pruning and cancels any existing future.
-   *
-   * @param event A new TasksDeleted event.
-   */
-  @Subscribe
-  public void tasksDeleted(final TasksDeleted event) {
-    for (IScheduledTask task : event.getTasks()) {
-      String id = Tasks.id(task);
-      tasksByJob.remove(Tasks.SCHEDULED_TO_JOB_KEY.apply(task), id);
-      Future<?> future = taskIdToFuture.remove(id);
-      if (future != null) {
-        future.cancel(false);
-      }
-    }
-  }
-
-  private void registerInactiveTask(
-      final IJobKey jobKey,
-      final String taskId,
-      long timeRemaining) {
-
-    LOG.fine("Prune task " + taskId + " in " + timeRemaining + " ms.");
-    // Insert the latest inactive task at the tail.
-    tasksByJob.put(jobKey, taskId);
-    Runnable runnable = new Runnable() {
-      @Override public void run() {
-        LOG.info("Pruning expired inactive task " + taskId);
-        tasksByJob.remove(jobKey, taskId);
-        taskIdToFuture.remove(taskId);
-        deleteTasks(ImmutableSet.of(taskId));
-      }
-    };
-    taskIdToFuture.put(taskId, executor.schedule(runnable, timeRemaining, TimeUnit.MILLISECONDS));
-
-    ImmutableSet.Builder<String> pruneTaskIds = ImmutableSet.builder();
-    Collection<String> tasks = tasksByJob.get(jobKey);
-    // From Multimaps javadoc: "It is imperative that the user manually synchronize on the returned
-    // multimap when accessing any of its collection views".
-    synchronized (tasksByJob) {
-      Iterator<String> iterator = tasks.iterator();
-      while (tasks.size() > perJobHistoryGoal) {
-        // Pick oldest task from the head. Guaranteed by LinkedHashMultimap based on insertion
-        // order.
-        String id = iterator.next();
-        iterator.remove();
-        pruneTaskIds.add(id);
-        Future<?> future = taskIdToFuture.remove(id);
-        if (future != null) {
-          future.cancel(false);
-        }
-      }
-    }
-
-    Set<String> ids = pruneTaskIds.build();
-    if (!ids.isEmpty()) {
-      deleteTasks(ids);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/async/OfferQueue.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/async/OfferQueue.java b/src/main/java/com/twitter/aurora/scheduler/async/OfferQueue.java
deleted file mode 100644
index 95334ff..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/async/OfferQueue.java
+++ /dev/null
@@ -1,341 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.async;
-
-import java.util.Comparator;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-
-import com.google.common.base.Function;
-import com.google.common.base.Objects;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.base.Supplier;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Ordering;
-import com.google.common.eventbus.Subscribe;
-
-import org.apache.mesos.Protos.Offer;
-import org.apache.mesos.Protos.OfferID;
-import org.apache.mesos.Protos.TaskInfo;
-
-import com.twitter.aurora.gen.HostStatus;
-import com.twitter.aurora.gen.MaintenanceMode;
-import com.twitter.aurora.scheduler.Driver;
-import com.twitter.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
-import com.twitter.aurora.scheduler.events.PubsubEvent.EventSubscriber;
-import com.twitter.aurora.scheduler.events.PubsubEvent.HostMaintenanceStateChange;
-import com.twitter.aurora.scheduler.state.MaintenanceController;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.stats.Stats;
-
-import static com.twitter.aurora.gen.MaintenanceMode.DRAINED;
-import static com.twitter.aurora.gen.MaintenanceMode.DRAINING;
-import static com.twitter.aurora.gen.MaintenanceMode.NONE;
-import static com.twitter.aurora.gen.MaintenanceMode.SCHEDULED;
-
-/**
- * Tracks the Offers currently known by the scheduler
- */
-public interface OfferQueue extends EventSubscriber {
-
-  /**
-   * Notifies the scheduler of a new resource offer.
-   *
-   * @param offer Newly-available resource offer.
-   */
-  void addOffer(Offer offer);
-
-  /**
-   * Invalidates an offer.  This indicates that the scheduler should not attempt to match any
-   * tasks against the offer.
-   *
-   * @param offer Canceled offer.
-   */
-  void cancelOffer(OfferID offer);
-
-  /**
-   * Launches the first task that satisfies the {@code acceptor} by returning a {@link TaskInfo}.
-   *
-   * @param acceptor Function that determines if an offer is accepted.
-   * @return {@code true} if the task was launched, {@code false} if no offers satisfied the
-   *         {@code acceptor}.
-   * @throws LaunchException If the acceptor accepted an offer, but there was an error launching the
-   *                         task.
-   */
-  boolean launchFirst(Function<Offer, Optional<TaskInfo>> acceptor) throws LaunchException;
-
-  /**
-   * Notifies the offer queue that a host has changed state.
-   *
-   * @param change State change notification.
-   */
-  void hostChangedState(HostMaintenanceStateChange change);
-
-  /**
-   * Gets the offers that the scheduler is holding.
-   *
-   * @return A snapshot of the offers that the scheduler is currently holding.
-   */
-  Iterable<Offer> getOffers();
-
-  /**
-   * Calculates the amount of time before an offer should be 'returned' by declining it.
-   * The delay is calculated for each offer that is received, so the return delay may be
-   * fixed or variable.
-   */
-  public interface OfferReturnDelay extends Supplier<Amount<Integer, Time>> {
-  }
-
-  /**
-   * Thrown when there was an unexpected failure trying to launch a task.
-   */
-  static class LaunchException extends Exception {
-    LaunchException(String msg) {
-      super(msg);
-    }
-
-    LaunchException(String msg, Throwable cause) {
-      super(msg, cause);
-    }
-  }
-
-  class OfferQueueImpl implements OfferQueue {
-    private static final Logger LOG = Logger.getLogger(OfferQueueImpl.class.getName());
-
-    static final Comparator<HostOffer> PREFERENCE_COMPARATOR =
-        // Currently, the only preference is based on host maintenance status.
-        Ordering.explicit(NONE, SCHEDULED, DRAINING, DRAINED)
-            .onResultOf(new Function<HostOffer, MaintenanceMode>() {
-              @Override public MaintenanceMode apply(HostOffer offer) {
-                return offer.mode;
-              }
-            })
-            .compound(Ordering.arbitrary());
-
-    private final Set<HostOffer> hostOffers = new ConcurrentSkipListSet<>(PREFERENCE_COMPARATOR);
-    private final AtomicLong offerRaces = Stats.exportLong("offer_accept_races");
-
-    private final Driver driver;
-    private final OfferReturnDelay returnDelay;
-    private final ScheduledExecutorService executor;
-    private final MaintenanceController maintenance;
-
-    @Inject
-    OfferQueueImpl(Driver driver,
-        OfferReturnDelay returnDelay,
-        ScheduledExecutorService executor,
-        MaintenanceController maintenance) {
-
-      this.driver = driver;
-      this.returnDelay = returnDelay;
-      this.executor = executor;
-      this.maintenance = maintenance;
-      // Potential gotcha - since this is now a ConcurrentSkipListSet, size() is more expensive.
-      // Could track this separately if it turns out to pose problems.
-      Stats.exportSize("outstanding_offers", hostOffers);
-    }
-
-    @Override
-    public void addOffer(final Offer offer) {
-      // We run a slight risk of a race here, which is acceptable.  The worst case is that we
-      // temporarily hold two offers for the same host, which should be corrected when we return
-      // them after the return delay.
-      // There's also a chance that we return an offer for compaction ~simultaneously with the
-      // same-host offer being canceled/returned.  This is also fine.
-      List<HostOffer> sameSlave = FluentIterable.from(hostOffers)
-          .filter(new Predicate<HostOffer>() {
-            @Override public boolean apply(HostOffer hostOffer) {
-              return hostOffer.offer.getSlaveId().equals(offer.getSlaveId());
-            }
-          })
-          .toList();
-      if (sameSlave.isEmpty()) {
-        hostOffers.add(new HostOffer(offer, maintenance.getMode(offer.getHostname())));
-        executor.schedule(
-            new Runnable() {
-              @Override public void run() {
-                removeAndDecline(offer.getId());
-              }
-            },
-            returnDelay.get().as(Time.MILLISECONDS),
-            TimeUnit.MILLISECONDS);
-      } else {
-        // If there are existing offers for the slave, decline all of them so the master can
-        // compact all of those offers into a single offer and send them back.
-        LOG.info("Returning " + (sameSlave.size() + 1)
-            + " offers for " + offer.getSlaveId().getValue() + " for compaction.");
-        decline(offer.getId());
-        for (HostOffer sameSlaveOffer : sameSlave) {
-          removeAndDecline(sameSlaveOffer.offer.getId());
-        }
-      }
-    }
-
-    void removeAndDecline(OfferID id) {
-      if (removeFromHostOffers(id)) {
-        decline(id);
-      }
-    }
-
-    void decline(OfferID id) {
-      LOG.fine("Declining offer " + id);
-      driver.declineOffer(id);
-    }
-
-    @Override
-    public void cancelOffer(final OfferID offerId) {
-      removeFromHostOffers(offerId);
-    }
-
-    private boolean removeFromHostOffers(final OfferID offerId) {
-      Preconditions.checkNotNull(offerId);
-
-      // The small risk of inconsistency is acceptable here - if we have an accept/remove race
-      // on an offer, the master will mark the task as LOST and it will be retried.
-      return Iterables.removeIf(hostOffers,
-          new Predicate<HostOffer>() {
-            @Override public boolean apply(HostOffer input) {
-              return input.offer.getId().equals(offerId);
-            }
-          });
-    }
-
-    @Override
-    public Iterable<Offer> getOffers() {
-      return Iterables.unmodifiableIterable(
-          FluentIterable.from(hostOffers)
-              .transform(new Function<HostOffer, Offer>() {
-                @Override public Offer apply(HostOffer offer) {
-                  return offer.offer;
-                }
-              }));
-    }
-
-    /**
-     * Updates the preference of a host's offers.
-     *
-     * @param change Host change notification.
-     */
-    @Subscribe
-    public void hostChangedState(HostMaintenanceStateChange change) {
-      final HostStatus hostStatus = change.getStatus();
-
-      // Remove and re-add a host's offers to re-sort based on its new hostStatus
-      Set<HostOffer> changedOffers = FluentIterable.from(hostOffers)
-          .filter(new Predicate<HostOffer>() {
-            @Override public boolean apply(HostOffer hostOffer) {
-              return hostOffer.offer.getHostname().equals(hostStatus.getHost());
-            }
-          })
-          .toSet();
-      hostOffers.removeAll(changedOffers);
-      hostOffers.addAll(
-          FluentIterable.from(changedOffers)
-              .transform(new Function<HostOffer, HostOffer>() {
-                @Override public HostOffer apply(HostOffer hostOffer) {
-                  return new HostOffer(hostOffer.offer, hostStatus.getMode());
-                }
-              })
-              .toSet());
-    }
-
-    /**
-     * Notifies the queue that the driver is disconnected, and all the stored offers are now
-     * invalid.
-     * <p>
-     * The queue takes this as a signal to flush its queue.
-     *
-     * @param event Disconnected event.
-     */
-    @Subscribe
-    public void driverDisconnected(DriverDisconnected event) {
-      LOG.info("Clearing stale offers since the driver is disconnected.");
-      hostOffers.clear();
-    }
-
-    /**
-     * Encapsulate an offer from a host, and the host's maintenance mode.
-     */
-    private static class HostOffer {
-      private final Offer offer;
-      private final MaintenanceMode mode;
-
-      HostOffer(Offer offer, MaintenanceMode mode) {
-        this.offer = offer;
-        this.mode = mode;
-      }
-
-      @Override
-      public boolean equals(Object o) {
-        if (!(o instanceof HostOffer)) {
-          return false;
-        }
-        HostOffer other = (HostOffer) o;
-        return Objects.equal(offer, other.offer) && (mode == other.mode);
-      }
-
-      @Override
-      public int hashCode() {
-        return Objects.hashCode(offer, mode);
-      }
-    }
-
-    @Override
-    public boolean launchFirst(Function<Offer, Optional<TaskInfo>> acceptor)
-        throws LaunchException {
-
-      // It's important that this method is not called concurrently - doing so would open up the
-      // possibility of a race between the same offers being accepted by different threads.
-
-      for (HostOffer hostOffer : hostOffers) {
-        Optional<TaskInfo> assignment = acceptor.apply(hostOffer.offer);
-        if (assignment.isPresent()) {
-          // Guard against an offer being removed after we grabbed it from the iterator.
-          // If that happens, the offer will not exist in hostOffers, and we can immediately
-          // send it back to LOST for quick reschedule.
-          if (hostOffers.remove(hostOffer)) {
-            try {
-              driver.launchTask(hostOffer.offer.getId(), assignment.get());
-              return true;
-            } catch (IllegalStateException e) {
-              // TODO(William Farner): Catch only the checked exception produced by Driver
-              // once it changes from throwing IllegalStateException when the driver is not yet
-              // registered.
-              throw new LaunchException("Failed to launch task.", e);
-            }
-          } else {
-            offerRaces.incrementAndGet();
-            throw new LaunchException(
-                "Accepted offer no longer exists in offer queue, likely data race.");
-          }
-        }
-      }
-
-      return false;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/async/Preemptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/async/Preemptor.java b/src/main/java/com/twitter/aurora/scheduler/async/Preemptor.java
deleted file mode 100644
index a01790c..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/async/Preemptor.java
+++ /dev/null
@@ -1,411 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.async;
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
-import com.google.common.collect.Ordering;
-import com.google.common.collect.Sets;
-import com.google.inject.BindingAnnotation;
-
-import com.twitter.aurora.scheduler.ResourceSlot;
-import com.twitter.aurora.scheduler.base.Query;
-import com.twitter.aurora.scheduler.base.ScheduleException;
-import com.twitter.aurora.scheduler.base.Tasks;
-import com.twitter.aurora.scheduler.filter.SchedulingFilter;
-import com.twitter.aurora.scheduler.state.SchedulerCore;
-import com.twitter.aurora.scheduler.storage.Storage;
-import com.twitter.aurora.scheduler.storage.entities.IAssignedTask;
-import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.stats.Stats;
-import com.twitter.common.util.Clock;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-import org.apache.mesos.Protos.Offer;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import static com.twitter.aurora.gen.ScheduleStatus.PENDING;
-import static com.twitter.aurora.gen.ScheduleStatus.PREEMPTING;
-import static com.twitter.aurora.scheduler.base.Tasks.SCHEDULED_TO_ASSIGNED;
-
-/**
- * Preempts active tasks in favor of higher priority tasks.
- */
-public interface Preemptor {
-
-  /**
-   * Preempts active tasks in favor of the input task.
-   *
-   * @param taskId ID of the preempting task.
-   * @return ID of the slave where preemption occured.
-   */
-  Optional<String> findPreemptionSlotFor(String taskId);
-
-  /**
-   * A task preemptor that tries to find tasks that are waiting to be scheduled, which are of higher
-   * priority than tasks that are currently running.
-   *
-   * To avoid excessive churn, the preemptor requires that a task is PENDING for a duration
-   * (dictated by {@link #preemptionCandidacyDelay}) before it becomes eligible to preempt other
-   * tasks.
-   */
-  class PreemptorImpl implements Preemptor {
-
-    /**
-     * Binding annotation for the time interval after which a pending task becomes eligible to
-     * preempt other tasks.
-     */
-    @BindingAnnotation
-    @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
-    @interface PreemptionDelay { }
-
-    @VisibleForTesting
-    static final Query.Builder CANDIDATE_QUERY = Query.statusScoped(
-        EnumSet.copyOf(Sets.difference(Tasks.ACTIVE_STATES, EnumSet.of(PENDING, PREEMPTING))));
-
-    private static final Logger LOG = Logger.getLogger(PreemptorImpl.class.getName());
-
-    private static final Function<IAssignedTask, Integer> GET_PRIORITY =
-        new Function<IAssignedTask, Integer>() {
-          @Override public Integer apply(IAssignedTask task) {
-            return task.getTask().getPriority();
-          }
-        };
-
-    private final AtomicLong tasksPreempted = Stats.exportLong("preemptor_tasks_preempted");
-    private final AtomicLong failedPreemptions = Stats.exportLong("preemptor_failed_preemptions");
-    // Incremented every time the preemptor is invoked and finds tasks pending and preemptable tasks
-    private final AtomicLong attemptedPreemptions = Stats.exportLong("preemptor_attempts");
-    // Incremented every time we fail to find tasks to preempt for a pending task.
-    private final AtomicLong noSlotsFound = Stats.exportLong("preemptor_no_slots_found");
-
-    private final Predicate<IScheduledTask> isIdleTask = new Predicate<IScheduledTask>() {
-      @Override public boolean apply(IScheduledTask task) {
-        return (clock.nowMillis() - Iterables.getLast(task.getTaskEvents()).getTimestamp())
-            >= preemptionCandidacyDelay.as(Time.MILLISECONDS);
-      }
-    };
-
-    private final Storage storage;
-    private final SchedulerCore scheduler;
-    private final OfferQueue offerQueue;
-    private final SchedulingFilter schedulingFilter;
-    private final Amount<Long, Time> preemptionCandidacyDelay;
-    private final Clock clock;
-
-    /**
-     * Creates a new preemptor.
-     *
-     * @param storage Backing store for tasks.
-     * @param scheduler Scheduler to fetch task information from, and instruct when preempting
-     *                  tasks.
-     * @param offerQueue Queue that contains available Mesos resource offers.
-     * @param schedulingFilter Filter to identify whether tasks may reside on given slaves.
-     * @param preemptionCandidacyDelay Time a task must be PENDING before it may preempt other
-     *                                 tasks.
-     * @param clock Clock to check current time.
-     */
-    @Inject
-    PreemptorImpl(
-        Storage storage,
-        SchedulerCore scheduler,
-        OfferQueue offerQueue,
-        SchedulingFilter schedulingFilter,
-        @PreemptionDelay Amount<Long, Time> preemptionCandidacyDelay,
-        Clock clock) {
-
-      this.storage = checkNotNull(storage);
-      this.scheduler = checkNotNull(scheduler);
-      this.offerQueue = checkNotNull(offerQueue);
-      this.schedulingFilter = checkNotNull(schedulingFilter);
-      this.preemptionCandidacyDelay = checkNotNull(preemptionCandidacyDelay);
-      this.clock = checkNotNull(clock);
-    }
-
-    private List<IAssignedTask> fetch(Query.Builder query, Predicate<IScheduledTask> filter) {
-      return Lists.newArrayList(Iterables.transform(Iterables.filter(
-          Storage.Util.consistentFetchTasks(storage, query), filter),
-          SCHEDULED_TO_ASSIGNED));
-    }
-
-    private List<IAssignedTask> fetch(Query.Builder query) {
-      return fetch(query, Predicates.<IScheduledTask>alwaysTrue());
-    }
-
-    private static final Function<IAssignedTask, String> TASK_TO_SLAVE_ID =
-        new Function<IAssignedTask, String>() {
-          @Override public String apply(IAssignedTask input) {
-            return input.getSlaveId();
-          }
-        };
-
-    private static final Function<IAssignedTask, String> TASK_TO_HOST =
-        new Function<IAssignedTask, String>() {
-          @Override public String apply(IAssignedTask input) {
-            return input.getSlaveHost();
-          }
-        };
-
-    private static Predicate<IAssignedTask> canPreempt(final IAssignedTask pending) {
-      return new Predicate<IAssignedTask>() {
-        @Override public boolean apply(IAssignedTask possibleVictim) {
-          return preemptionFilter(possibleVictim).apply(pending);
-        }
-      };
-    }
-
-    private static final Function<IAssignedTask, ResourceSlot> TASK_TO_RESOURCES =
-        new Function<IAssignedTask, ResourceSlot>() {
-          @Override public ResourceSlot apply(IAssignedTask input) {
-            return ResourceSlot.from(input.getTask());
-          }
-        };
-
-    private static final Function<Offer, ResourceSlot> OFFER_TO_RESOURCE_SLOT =
-        new Function<Offer, ResourceSlot>() {
-          @Override public ResourceSlot apply(Offer offer) {
-            return ResourceSlot.from(offer);
-          }
-        };
-
-    private static final Function<Offer, String> OFFER_TO_HOST =
-        new Function<Offer, String>() {
-          @Override public String apply(Offer offer) {
-            return offer.getHostname();
-          }
-        };
-
-    // TODO(zmanji) Consider using Dominant Resource Fairness for ordering instead of the vector
-    // ordering
-    private static final Ordering<IAssignedTask> RESOURCE_ORDER =
-        ResourceSlot.ORDER.onResultOf(TASK_TO_RESOURCES).reverse();
-
-    /**
-     * Optional.absent indicates that this slave does not have enough resources to satisfy the task.
-     * The empty set indicates the offers (slack) are enough.
-     * A set with elements indicates those tasks and the offers are enough.
-     */
-    private Optional<Set<IAssignedTask>> getTasksToPreempt(
-        Iterable<IAssignedTask> possibleVictims,
-        Iterable<Offer> offers,
-        IAssignedTask pendingTask) {
-
-      // This enforces the precondition that all of the resources are from the same host. We need to
-      // get the host for the schedulingFilter.
-      Set<String> hosts = ImmutableSet.<String>builder()
-          .addAll(Iterables.transform(possibleVictims, TASK_TO_HOST))
-          .addAll(Iterables.transform(offers, OFFER_TO_HOST)).build();
-
-      String host = Iterables.getOnlyElement(hosts);
-
-      ResourceSlot slackResources =
-          ResourceSlot.sum(Iterables.transform(offers, OFFER_TO_RESOURCE_SLOT));
-
-      if (!Iterables.isEmpty(offers)) {
-        Set<SchedulingFilter.Veto> vetos = schedulingFilter.filter(
-            slackResources,
-            host,
-            pendingTask.getTask(),
-            pendingTask.getTaskId());
-
-        if (vetos.isEmpty()) {
-          return Optional.<Set<IAssignedTask>>of(ImmutableSet.<IAssignedTask>of());
-        }
-      }
-
-      FluentIterable<IAssignedTask> preemptableTasks =
-          FluentIterable.from(possibleVictims).filter(canPreempt(pendingTask));
-
-      if (preemptableTasks.isEmpty()) {
-        return Optional.absent();
-      }
-
-      List<IAssignedTask> toPreemptTasks = Lists.newArrayList();
-
-      Iterable<IAssignedTask> sortedVictims = RESOURCE_ORDER.immutableSortedCopy(preemptableTasks);
-
-      for (IAssignedTask victim : sortedVictims) {
-        toPreemptTasks.add(victim);
-
-        ResourceSlot totalResource = ResourceSlot.sum(
-            ResourceSlot.sum(Iterables.transform(toPreemptTasks, TASK_TO_RESOURCES)),
-            slackResources);
-
-        Set<SchedulingFilter.Veto> vetos = schedulingFilter.filter(
-            totalResource,
-            host,
-            pendingTask.getTask(),
-            pendingTask.getTaskId());
-
-        if (vetos.isEmpty()) {
-          return Optional.<Set<IAssignedTask>>of(ImmutableSet.copyOf(toPreemptTasks));
-        }
-      }
-      return Optional.absent();
-    }
-
-    private static final Function<Offer, String> OFFER_TO_SLAVE_ID =
-        new Function<Offer, String>() {
-          @Override public String apply(Offer offer) {
-            return offer.getSlaveId().getValue();
-          }
-        };
-
-    private Multimap<String, IAssignedTask> getSlavesToActiveTasks() {
-      // Only non-pending active tasks may be preempted.
-      List<IAssignedTask> activeTasks = fetch(CANDIDATE_QUERY);
-
-      // Walk through the preemption candidates in reverse scheduling order.
-      Collections.sort(activeTasks, Tasks.SCHEDULING_ORDER.reverse());
-
-      // Group the tasks by slave id so they can be paired with offers from the same slave.
-      return Multimaps.index(activeTasks, TASK_TO_SLAVE_ID);
-    }
-
-    @Override
-    public synchronized Optional<String> findPreemptionSlotFor(String taskId) {
-      List<IAssignedTask> pendingTasks =
-          fetch(Query.statusScoped(PENDING).byId(taskId), isIdleTask);
-
-      // Task is no longer PENDING no need to preempt
-      if (pendingTasks.isEmpty()) {
-        return Optional.absent();
-      }
-
-      IAssignedTask pendingTask = Iterables.getOnlyElement(pendingTasks);
-
-      Multimap<String, IAssignedTask> slavesToActiveTasks = getSlavesToActiveTasks();
-
-      if (slavesToActiveTasks.isEmpty()) {
-        return Optional.absent();
-      }
-
-      attemptedPreemptions.incrementAndGet();
-
-      // Group the offers by slave id so they can be paired with active tasks from the same slave.
-      Multimap<String, Offer> slavesToOffers =
-          Multimaps.index(offerQueue.getOffers(), OFFER_TO_SLAVE_ID);
-
-      Set<String> allSlaves = ImmutableSet.<String>builder()
-          .addAll(slavesToOffers.keySet())
-          .addAll(slavesToActiveTasks.keySet())
-          .build();
-
-      for (String slaveID : allSlaves) {
-        Optional<Set<IAssignedTask>> toPreemptTasks = getTasksToPreempt(
-            slavesToActiveTasks.get(slaveID),
-            slavesToOffers.get(slaveID),
-            pendingTask);
-
-        if (toPreemptTasks.isPresent()) {
-          try {
-            for (IAssignedTask toPreempt : toPreemptTasks.get()) {
-              scheduler.preemptTask(toPreempt, pendingTask);
-              tasksPreempted.incrementAndGet();
-            }
-            return Optional.of(slaveID);
-          } catch (ScheduleException e) {
-            LOG.log(Level.SEVERE, "Preemption failed", e);
-            failedPreemptions.incrementAndGet();
-          }
-        }
-      }
-
-      noSlotsFound.incrementAndGet();
-      return Optional.absent();
-    }
-
-    private static final Predicate<IAssignedTask> IS_PRODUCTION =
-        Predicates.compose(Tasks.IS_PRODUCTION, Tasks.ASSIGNED_TO_INFO);
-
-    /**
-     * Creates a static filter that will identify tasks that may preempt the provided task.
-     * A task may preempt another task if the following conditions hold true:
-     * - The resources reserved for {@code preemptableTask} are sufficient to satisfy the task.
-     * - The tasks are owned by the same user and the priority of {@code preemptableTask} is lower
-     *     OR {@code preemptableTask} is non-production and the compared task is production.
-     *
-     * @param preemptableTask Task to possibly preempt.
-     * @return A filter that will compare the priorities and resources required by other tasks
-     *     with {@code preemptableTask}.
-     */
-    private static Predicate<IAssignedTask> preemptionFilter(IAssignedTask preemptableTask) {
-      Predicate<IAssignedTask> preemptableIsProduction = preemptableTask.getTask().isProduction()
-          ? Predicates.<IAssignedTask>alwaysTrue()
-          : Predicates.<IAssignedTask>alwaysFalse();
-
-      Predicate<IAssignedTask> priorityFilter =
-          greaterPriorityFilter(GET_PRIORITY.apply(preemptableTask));
-      return Predicates.or(
-          Predicates.and(Predicates.not(preemptableIsProduction), IS_PRODUCTION),
-          Predicates.and(isOwnedBy(getRole(preemptableTask)), priorityFilter)
-      );
-    }
-
-    private static Predicate<IAssignedTask> isOwnedBy(final String role) {
-      return new Predicate<IAssignedTask>() {
-        @Override public boolean apply(IAssignedTask task) {
-          return getRole(task).equals(role);
-        }
-      };
-    }
-
-    private static String getRole(IAssignedTask task) {
-      return task.getTask().getOwner().getRole();
-    }
-
-    private static Predicate<Integer> greaterThan(final int value) {
-      return new Predicate<Integer>() {
-        @Override public boolean apply(Integer input) {
-          return input > value;
-        }
-      };
-    }
-
-    private static Predicate<IAssignedTask> greaterPriorityFilter(int priority) {
-      return Predicates.compose(greaterThan(priority), GET_PRIORITY);
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/async/RescheduleCalculator.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/async/RescheduleCalculator.java b/src/main/java/com/twitter/aurora/scheduler/async/RescheduleCalculator.java
deleted file mode 100644
index eefc03a..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/async/RescheduleCalculator.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.async;
-
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Set;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-
-import com.twitter.aurora.gen.ScheduleStatus;
-import com.twitter.aurora.scheduler.base.Query;
-import com.twitter.aurora.scheduler.base.Tasks;
-import com.twitter.aurora.scheduler.storage.Storage;
-import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
-import com.twitter.aurora.scheduler.storage.entities.ITaskEvent;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.util.BackoffStrategy;
-import com.twitter.common.util.Clock;
-import com.twitter.common.util.Random;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import static com.twitter.aurora.gen.ScheduleStatus.KILLING;
-import static com.twitter.aurora.gen.ScheduleStatus.RESTARTING;
-
-/**
- * Calculates scheduling delays for tasks.
- */
-interface RescheduleCalculator {
-  /**
-   * Gets a timestamp for the task to become eligible for (re)scheduling at scheduler startup.
-   *
-   * @param task Task to calculate timestamp for.
-   * @return Timestamp in msec.
-   */
-  long getStartupReadyTimeMs(IScheduledTask task);
-
-  /**
-   * Gets a timestamp for the task to become eligible for (re)scheduling.
-   *
-   * @param task Task to calculate timestamp for.
-   * @return Timestamp in msec.
-   */
-  long getReadyTimeMs(IScheduledTask task);
-
-  class RescheduleCalculatorImpl implements RescheduleCalculator {
-
-    private static final Logger LOG = Logger.getLogger(TaskGroups.class.getName());
-
-    private final Storage storage;
-    private final RescheduleCalculatorSettings settings;
-    private final Clock clock;
-    private final Random random = new Random.SystemRandom(new java.util.Random());
-
-    private static final Predicate<ScheduleStatus> IS_ACTIVE_STATUS =
-        Predicates.in(Tasks.ACTIVE_STATES);
-
-    private static final Function<ITaskEvent, ScheduleStatus> TO_STATUS =
-        new Function<ITaskEvent, ScheduleStatus>() {
-          @Override public ScheduleStatus apply(ITaskEvent input) {
-            return input.getStatus();
-          }
-        };
-
-    private static final Set<ScheduleStatus> INTERRUPTED_TASK_STATES =
-        EnumSet.of(RESTARTING, KILLING);
-
-    private final Predicate<IScheduledTask> flapped = new Predicate<IScheduledTask>() {
-      @Override public boolean apply(IScheduledTask task) {
-        if (!task.isSetTaskEvents()) {
-          return false;
-        }
-
-        List<ITaskEvent> events = Lists.reverse(task.getTaskEvents());
-
-        // Avoid penalizing tasks that were interrupted by outside action, such as a user
-        // restarting them.
-        if (Iterables.any(Iterables.transform(events, TO_STATUS),
-            Predicates.in(INTERRUPTED_TASK_STATES))) {
-          return false;
-        }
-
-        ITaskEvent terminalEvent = Iterables.get(events, 0);
-        ScheduleStatus terminalState = terminalEvent.getStatus();
-        Preconditions.checkState(Tasks.isTerminated(terminalState));
-
-        ITaskEvent activeEvent =
-            Iterables.find(events, Predicates.compose(IS_ACTIVE_STATUS, TO_STATUS));
-
-        long thresholdMs = settings.flappingTaskThreashold.as(Time.MILLISECONDS);
-
-        return (terminalEvent.getTimestamp() - activeEvent.getTimestamp()) < thresholdMs;
-      }
-    };
-
-    static class RescheduleCalculatorSettings {
-      private final BackoffStrategy flappingTaskBackoff;
-      private final Amount<Long, Time> flappingTaskThreashold;
-      private final Amount<Integer, Time>  maxStartupRescheduleDelay;
-
-      RescheduleCalculatorSettings(
-          BackoffStrategy flappingTaskBackoff,
-          Amount<Long, Time> flappingTaskThreashold,
-          Amount<Integer, Time> maxStartupRescheduleDelay) {
-
-        this.flappingTaskBackoff = checkNotNull(flappingTaskBackoff);
-        this.flappingTaskThreashold = checkNotNull(flappingTaskThreashold);
-        this.maxStartupRescheduleDelay = checkNotNull(maxStartupRescheduleDelay);
-      }
-    }
-
-    @Inject
-    RescheduleCalculatorImpl(
-        Storage storage,
-        RescheduleCalculatorSettings settings,
-        Clock clock) {
-
-      this.storage = checkNotNull(storage);
-      this.settings = checkNotNull(settings);
-      this.clock = checkNotNull(clock);
-    }
-
-    @Override
-    public long getStartupReadyTimeMs(IScheduledTask task) {
-      return random.nextInt(settings.maxStartupRescheduleDelay.as(Time.MILLISECONDS))
-          + getTaskReadyTimestamp(task);
-    }
-
-    @Override
-    public long getReadyTimeMs(IScheduledTask task) {
-      return getTaskReadyTimestamp(task);
-    }
-
-    private Optional<IScheduledTask> getTaskAncestor(IScheduledTask task) {
-      if (!task.isSetAncestorId()) {
-        return Optional.absent();
-      }
-
-      ImmutableSet<IScheduledTask> res =
-          Storage.Util.weaklyConsistentFetchTasks(storage, Query.taskScoped(task.getAncestorId()));
-
-      return Optional.fromNullable(Iterables.getOnlyElement(res, null));
-    }
-
-    private long getTaskReadyTimestamp(IScheduledTask task) {
-      Optional<IScheduledTask> curTask = getTaskAncestor(task);
-      long penaltyMs = 0;
-      while (curTask.isPresent() && flapped.apply(curTask.get())) {
-        LOG.info(
-            String.format("Ancestor of %s flapped: %s", Tasks.id(task), Tasks.id(curTask.get())));
-        long newPenalty = settings.flappingTaskBackoff.calculateBackoffMs(penaltyMs);
-        // If the backoff strategy is truncated then there is no need for us to continue.
-        if (newPenalty == penaltyMs) {
-          break;
-        }
-        penaltyMs = newPenalty;
-        curTask = getTaskAncestor(curTask.get());
-      }
-
-      return penaltyMs + clock.nowMillis();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/async/TaskGroup.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/async/TaskGroup.java b/src/main/java/com/twitter/aurora/scheduler/async/TaskGroup.java
deleted file mode 100644
index 018022b..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/async/TaskGroup.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.async;
-
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.PriorityBlockingQueue;
-import java.util.concurrent.atomic.AtomicLong;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicates;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Ordering;
-
-import com.twitter.aurora.scheduler.async.TaskGroups.GroupKey;
-import com.twitter.common.base.Function;
-import com.twitter.common.util.BackoffStrategy;
-
-/**
- * A group of task IDs that are eligible for scheduling, but may be waiting for a backoff to expire.
- */
-class TaskGroup {
-  private final GroupKey key;
-  private final BackoffStrategy backoffStrategy;
-
-  private static final Function<Task, Long> TO_TIMESTAMP = new Function<Task, Long>() {
-    @Override public Long apply(Task item) {
-      return item.readyTimestampMs;
-    }
-  };
-
-  // Order the tasks by the time they are ready to be scheduled
-  private static final Ordering<Task> TASK_ORDERING = Ordering.natural().onResultOf(TO_TIMESTAMP);
-  // 11 is the magic number used by PriorityBlockingQueue as the initial size.
-  private final Queue<Task> tasks = new PriorityBlockingQueue<>(11, TASK_ORDERING);
-  // Penalty for the task group for failing to schedule.
-  private final AtomicLong penaltyMs;
-
-  TaskGroup(GroupKey key, BackoffStrategy backoffStrategy) {
-    this.key = key;
-    this.backoffStrategy = backoffStrategy;
-    penaltyMs = new AtomicLong();
-    resetPenaltyAndGet();
-  }
-
-  GroupKey getKey() {
-    return key;
-  }
-
-  private static final Function<Task, String> TO_TASK_ID =
-      new Function<Task, String>() {
-        @Override public String apply(Task item) {
-          return item.taskId;
-        }
-      };
-
-  /**
-   * Removes the task at the head of the queue.
-   *
-   * @return String the id of the head task.
-   * @throws IllegalStateException if the queue is empty.
-   */
-  String pop() throws IllegalStateException {
-    Task head = tasks.poll();
-    Preconditions.checkState(head != null);
-    return head.taskId;
-  }
-
-  void remove(String taskId) {
-    Iterables.removeIf(tasks, Predicates.compose(Predicates.equalTo(taskId), TO_TASK_ID));
-  }
-
-  void push(final String taskId, long readyTimestamp) {
-    tasks.offer(new Task(taskId, readyTimestamp));
-  }
-
-  synchronized long resetPenaltyAndGet() {
-    penaltyMs.set(backoffStrategy.calculateBackoffMs(0));
-    return getPenaltyMs();
-  }
-
-  synchronized long penalizeAndGet() {
-    penaltyMs.set(backoffStrategy.calculateBackoffMs(getPenaltyMs()));
-    return getPenaltyMs();
-  }
-
-  GroupState isReady(long nowMs) {
-    Task task = tasks.peek();
-    if (task == null) {
-      return GroupState.EMPTY;
-    }
-
-    if (task.readyTimestampMs > nowMs) {
-      return GroupState.NOT_READY;
-    }
-    return GroupState.READY;
-  }
-  // Begin methods used for debug interfaces.
-
-  public String getName() {
-    return key.toString();
-  }
-
-  // TODO(zmanji): Return Task instances here. Can use them to display flapping penalty on web UI.
-  public Set<String> getTaskIds() {
-    return ImmutableSet.copyOf(Iterables.transform(tasks, TO_TASK_ID));
-  }
-
-  public long getPenaltyMs() {
-    return penaltyMs.get();
-  }
-
-  private static class Task {
-    private final String taskId;
-    private final long readyTimestampMs;
-
-    Task(String taskId, long readyTimestampMs) {
-      this.taskId = Preconditions.checkNotNull(taskId);
-      this.readyTimestampMs = readyTimestampMs;
-    }
-  }
-
-  enum GroupState {
-    EMPTY,      // The group is empty.
-    NOT_READY,  // Every task in the group is not ready yet.
-    READY       // The task at the head of the queue is ready.
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/async/TaskGroups.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/async/TaskGroups.java b/src/main/java/com/twitter/aurora/scheduler/async/TaskGroups.java
deleted file mode 100644
index a59e5c8..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/async/TaskGroups.java
+++ /dev/null
@@ -1,294 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.async;
-
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-
-import com.google.common.base.Objects;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.eventbus.Subscribe;
-import com.google.common.util.concurrent.RateLimiter;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-import com.twitter.aurora.scheduler.base.JobKeys;
-import com.twitter.aurora.scheduler.base.Query;
-import com.twitter.aurora.scheduler.base.Tasks;
-import com.twitter.aurora.scheduler.events.PubsubEvent.EventSubscriber;
-import com.twitter.aurora.scheduler.events.PubsubEvent.StorageStarted;
-import com.twitter.aurora.scheduler.events.PubsubEvent.TaskStateChange;
-import com.twitter.aurora.scheduler.events.PubsubEvent.TasksDeleted;
-import com.twitter.aurora.scheduler.storage.Storage;
-import com.twitter.aurora.scheduler.storage.entities.IAssignedTask;
-import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
-import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
-import com.twitter.common.application.ShutdownRegistry;
-import com.twitter.common.base.Command;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.stats.Stats;
-import com.twitter.common.util.BackoffStrategy;
-import com.twitter.common.util.Clock;
-import com.twitter.common.util.concurrent.ExecutorServiceShutdown;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import static com.twitter.aurora.gen.ScheduleStatus.PENDING;
-import static com.twitter.aurora.scheduler.async.TaskGroup.GroupState;
-
-/**
- * A collection of task groups, where a task group is a collection of tasks that are known to be
- * equal in the way they schedule. This is expected to be tasks associated with the same job key,
- * who also have {@code equal()} {@link ITaskConfig} values.
- * <p>
- * This is used to prevent redundant work in trying to schedule tasks as well as to provide
- * nearly-equal responsiveness when scheduling across jobs.  In other words, a 1000 instance job
- * cannot starve a 1 instance job.
- */
-public class TaskGroups implements EventSubscriber {
-
-  private static final Logger LOG = Logger.getLogger(TaskGroups.class.getName());
-
-  private final Storage storage;
-  private final LoadingCache<GroupKey, TaskGroup> groups;
-  private final Clock clock;
-  private final RescheduleCalculator rescheduleCalculator;
-
-  static class TaskGroupsSettings {
-    private final BackoffStrategy taskGroupBackoff;
-    private final RateLimiter rateLimiter;
-
-    TaskGroupsSettings(BackoffStrategy taskGroupBackoff, RateLimiter rateLimiter) {
-      this.taskGroupBackoff = checkNotNull(taskGroupBackoff);
-      this.rateLimiter = checkNotNull(rateLimiter);
-    }
-  }
-
-  @Inject
-  TaskGroups(
-      ShutdownRegistry shutdownRegistry,
-      Storage storage,
-      TaskGroupsSettings settings,
-      TaskScheduler taskScheduler,
-      Clock clock,
-      RescheduleCalculator rescheduleCalculator) {
-
-    this(
-        createThreadPool(shutdownRegistry),
-        storage,
-        settings.taskGroupBackoff,
-        settings.rateLimiter,
-        taskScheduler,
-        clock,
-        rescheduleCalculator);
-  }
-
-  TaskGroups(
-      final ScheduledExecutorService executor,
-      final Storage storage,
-      final BackoffStrategy taskGroupBackoffStrategy,
-      final RateLimiter rateLimiter,
-      final TaskScheduler taskScheduler,
-      final Clock clock,
-      final RescheduleCalculator rescheduleCalculator) {
-
-    this.storage = checkNotNull(storage);
-    checkNotNull(executor);
-    checkNotNull(taskGroupBackoffStrategy);
-    checkNotNull(rateLimiter);
-    checkNotNull(taskScheduler);
-    this.clock = checkNotNull(clock);
-    this.rescheduleCalculator = checkNotNull(rescheduleCalculator);
-
-    final TaskScheduler ratelLimitedScheduler = new TaskScheduler() {
-      @Override public TaskSchedulerResult schedule(String taskId) {
-        rateLimiter.acquire();
-        return taskScheduler.schedule(taskId);
-      }
-    };
-
-    groups = CacheBuilder.newBuilder().build(new CacheLoader<GroupKey, TaskGroup>() {
-      @Override public TaskGroup load(GroupKey key) {
-        TaskGroup group = new TaskGroup(key, taskGroupBackoffStrategy);
-        LOG.info("Evaluating group " + key + " in " + group.getPenaltyMs() + " ms");
-        startGroup(group, executor, ratelLimitedScheduler);
-        return group;
-      }
-    });
-  }
-
-  private synchronized boolean maybeInvalidate(TaskGroup group) {
-    if (group.getTaskIds().isEmpty()) {
-      groups.invalidate(group.getKey());
-      return true;
-    }
-    return false;
-  }
-
-  private void startGroup(
-      final TaskGroup group,
-      final ScheduledExecutorService executor,
-      final TaskScheduler taskScheduler) {
-
-    Runnable monitor = new Runnable() {
-      @Override public void run() {
-        GroupState state = group.isReady(clock.nowMillis());
-
-        switch (state) {
-          case EMPTY:
-            maybeInvalidate(group);
-            break;
-
-          case READY:
-            String id = group.pop();
-            TaskScheduler.TaskSchedulerResult result = taskScheduler.schedule(id);
-            switch (result) {
-              case SUCCESS:
-                if (!maybeInvalidate(group)) {
-                  executor.schedule(this, group.resetPenaltyAndGet(), TimeUnit.MILLISECONDS);
-                }
-                break;
-
-              case TRY_AGAIN:
-                group.push(id, clock.nowMillis());
-                executor.schedule(this, group.penalizeAndGet(), TimeUnit.MILLISECONDS);
-                break;
-
-              default:
-                throw new IllegalStateException("Unknown TaskSchedulerResult " + result);
-            }
-            break;
-
-          case NOT_READY:
-            executor.schedule(this, group.getPenaltyMs(), TimeUnit.MILLISECONDS);
-            break;
-
-          default:
-            throw new IllegalStateException("Unknown GroupState " + state);
-        }
-      }
-    };
-    executor.schedule(monitor, group.getPenaltyMs(), TimeUnit.MILLISECONDS);
-  }
-
-  private static ScheduledExecutorService createThreadPool(ShutdownRegistry shutdownRegistry) {
-    // TODO(William Farner): Leverage ExceptionHandlingScheduledExecutorService:
-    // com.twitter.common.util.concurrent.ExceptionHandlingScheduledExecutorService
-    final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(
-        1,
-        new ThreadFactoryBuilder().setDaemon(true).setNameFormat("TaskScheduler-%d").build());
-    Stats.exportSize("schedule_queue_size", executor.getQueue());
-    shutdownRegistry.addAction(new Command() {
-      @Override public void execute() {
-        new ExecutorServiceShutdown(executor, Amount.of(1L, Time.SECONDS)).execute();
-      }
-    });
-    return executor;
-  }
-
-  private synchronized void add(IAssignedTask task, long readyTimestamp) {
-    groups.getUnchecked(new GroupKey(task.getTask())).push(task.getTaskId(), readyTimestamp);
-  }
-
-  /**
-   * Informs the task groups of a task state change.
-   * <p>
-   * This is used to observe {@link com.twitter.aurora.gen.ScheduleStatus#PENDING} tasks and begin
-   * attempting to schedule them.
-   *
-   * @param stateChange State change notification.
-   */
-  @Subscribe
-  public synchronized void taskChangedState(TaskStateChange stateChange) {
-    if (stateChange.getNewState() == PENDING) {
-      add(
-          stateChange.getTask().getAssignedTask(),
-          rescheduleCalculator.getReadyTimeMs(stateChange.getTask()));
-    }
-  }
-
-  /**
-   * Signals that storage has started and is consistent.
-   * <p>
-   * Upon this signal, all {@link com.twitter.aurora.gen.ScheduleStatus#PENDING} tasks in the stoage
-   * will become eligible for scheduling.
-   *
-   * @param event Storage started notification.
-   */
-  @Subscribe
-  public void storageStarted(StorageStarted event) {
-    for (IScheduledTask task
-        : Storage.Util.consistentFetchTasks(storage, Query.unscoped().byStatus(PENDING))) {
-
-      add(task.getAssignedTask(), rescheduleCalculator.getStartupReadyTimeMs(task));
-    }
-  }
-
-  /**
-   * Signals the scheduler that tasks have been deleted.
-   *
-   * @param deleted Tasks deleted event.
-   */
-  @Subscribe
-  public synchronized void tasksDeleted(TasksDeleted deleted) {
-    for (IAssignedTask task
-        : Iterables.transform(deleted.getTasks(), Tasks.SCHEDULED_TO_ASSIGNED)) {
-      TaskGroup group = groups.getIfPresent(new GroupKey(task.getTask()));
-      if (group != null) {
-        group.remove(task.getTaskId());
-      }
-    }
-  }
-
-  public Iterable<TaskGroup> getGroups() {
-    return ImmutableSet.copyOf(groups.asMap().values());
-  }
-
-  static class GroupKey {
-    private final ITaskConfig canonicalTask;
-
-    GroupKey(ITaskConfig task) {
-      this.canonicalTask = task;
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hashCode(canonicalTask);
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (!(o instanceof GroupKey)) {
-        return false;
-      }
-      GroupKey other = (GroupKey) o;
-      return Objects.equal(canonicalTask, other.canonicalTask);
-    }
-
-    @Override
-    public String toString() {
-      return JobKeys.toPath(Tasks.INFO_TO_JOB_KEY.apply(canonicalTask));
-    }
-  }
-}