You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2013/12/31 22:20:43 UTC

[50/51] [partial] Rename twitter* and com.twitter to apache and org.apache directories to preserve all file history before the refactor.

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/codec/ThriftBinaryCodec.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/codec/ThriftBinaryCodec.java b/src/main/java/com/twitter/aurora/codec/ThriftBinaryCodec.java
deleted file mode 100644
index 2443078..0000000
--- a/src/main/java/com/twitter/aurora/codec/ThriftBinaryCodec.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.codec;
-
-import javax.annotation.Nullable;
-
-import com.google.common.base.Preconditions;
-
-import org.apache.thrift.TBase;
-import org.apache.thrift.TDeserializer;
-import org.apache.thrift.TException;
-import org.apache.thrift.TSerializer;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TProtocolFactory;
-
-/**
- * Codec that works for thrift objects.
- */
-public final class ThriftBinaryCodec {
-
-  /**
-   * Protocol factory used for all thrift encoding and decoding.
-   */
-  public static final TProtocolFactory PROTOCOL_FACTORY = new TBinaryProtocol.Factory();
-
-  private ThriftBinaryCodec() {
-    // Utility class.
-  }
-
-  /**
-   * Identical to {@link #decodeNonNull(Class, byte[])}, but allows for a null buffer.
-   *
-   * @param clazz Class to instantiate and deserialize to.
-   * @param buffer Buffer to decode.
-   * @param <T> Target type.
-   * @return A populated message, or {@code null} if the buffer was {@code null}.
-   * @throws CodingException If the message could not be decoded.
-   */
-  @Nullable
-  public static <T extends TBase<T, ?>> T decode(Class<T> clazz, @Nullable byte[] buffer)
-      throws CodingException {
-
-    if (buffer == null) {
-      return null;
-    }
-    return decodeNonNull(clazz, buffer);
-  }
-
-  /**
-   * Decodes a binary-encoded byte array into a target type.
-   *
-   * @param clazz Class to instantiate and deserialize to.
-   * @param buffer Buffer to decode.
-   * @param <T> Target type.
-   * @return A populated message.
-   * @throws CodingException If the message could not be decoded.
-   */
-  public static <T extends TBase<T, ?>> T decodeNonNull(Class<T> clazz, byte[] buffer)
-      throws CodingException {
-
-    Preconditions.checkNotNull(clazz);
-    Preconditions.checkNotNull(buffer);
-
-    try {
-      T t = clazz.newInstance();
-      new TDeserializer(PROTOCOL_FACTORY).deserialize(t, buffer);
-      return t;
-    } catch (IllegalAccessException e) {
-      throw new CodingException("Failed to access constructor for target type.", e);
-    } catch (InstantiationException e) {
-      throw new CodingException("Failed to instantiate target type.", e);
-    } catch (TException e) {
-      throw new CodingException("Failed to deserialize thrift object.", e);
-    }
-  }
-
-  /**
-   * Identical to {@link #encodeNonNull(TBase)}, but allows for a null input.
-   *
-   * @param tBase Object to encode.
-   * @return Encoded object, or {@code null} if the argument was {@code null}.
-   * @throws CodingException If the object could not be encoded.
-   */
-  @Nullable
-  public static byte[] encode(@Nullable TBase<?, ?> tBase) throws CodingException {
-    if (tBase == null) {
-      return null;
-    }
-    return encodeNonNull(tBase);
-  }
-
-  /**
-   * Encodes a thrift object into a binary array.
-   *
-   * @param tBase Object to encode.
-   * @return Encoded object.
-   * @throws CodingException If the object could not be encoded.
-   */
-  public static byte[] encodeNonNull(TBase<?, ?> tBase) throws CodingException {
-    Preconditions.checkNotNull(tBase);
-
-    try {
-      return new TSerializer(PROTOCOL_FACTORY).serialize(tBase);
-    } catch (TException e) {
-      throw new CodingException("Failed to serialize: " + tBase, e);
-    }
-  }
-
-  /**
-   * Thrown when serialization or deserialization failed.
-   */
-  public static class CodingException extends Exception {
-    public CodingException(String message) {
-      super(message);
-    }
-    public CodingException(String msg, Throwable cause) {
-      super(msg, cause);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/Driver.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/Driver.java b/src/main/java/com/twitter/aurora/scheduler/Driver.java
deleted file mode 100644
index aa77887..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/Driver.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler;
-
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Supplier;
-import com.google.common.collect.ImmutableList;
-
-import org.apache.mesos.Protos;
-import org.apache.mesos.Protos.OfferID;
-import org.apache.mesos.Protos.Status;
-import org.apache.mesos.Protos.TaskInfo;
-import org.apache.mesos.SchedulerDriver;
-
-import com.twitter.common.stats.Stats;
-import com.twitter.common.util.StateMachine;
-
-import static org.apache.mesos.Protos.Status.DRIVER_RUNNING;
-
-/**
- * Wraps the mesos Scheduler driver to ensure its used in a valid lifecycle; namely:
- * <pre>
- *   (run -> kill*)? -> stop*
- * </pre>
- *
- * Also ensures the driver is only asked for when needed.
- */
-public interface Driver {
-
-  /**
-   * Launches a task.
-   *
-   * @param offerId ID of the resource offer to accept with the task.
-   * @param task Task to launch.
-   */
-  void launchTask(OfferID offerId, TaskInfo task);
-
-  /**
-   * Declines a resource offer.
-   *
-   * @param offerId ID of the offer to decline.
-   */
-  void declineOffer(OfferID offerId);
-
-  /**
-   * Sends a kill task request for the given {@code taskId} to the mesos master.
-   *
-   * @param taskId The id of the task to kill.
-   */
-  void killTask(String taskId);
-
-  /**
-   * Stops the underlying driver if it is running, otherwise does nothing.
-   */
-  void stop();
-
-  /**
-   * Starts the underlying driver.  Can only be called once.
-   *
-   * @return The status of the underlying driver run request.
-   */
-  Protos.Status start();
-
-  /**
-   * Blocks until the underlying driver is stopped or aborted.
-   *
-   * @return The status of the underlying driver upon exit.
-   */
-  Protos.Status join();
-
-  /**
-   * Mesos driver implementation.
-   */
-  static class DriverImpl implements Driver {
-    private static final Logger LOG = Logger.getLogger(Driver.class.getName());
-
-    /**
-     * Driver states.
-     */
-    enum State {
-      INIT,
-      RUNNING,
-      STOPPED
-    }
-
-    private final StateMachine<State> stateMachine;
-    private final Supplier<Optional<SchedulerDriver>> driverSupplier;
-    private final AtomicLong killFailures = Stats.exportLong("scheduler_driver_kill_failures");
-
-    /**
-     * Creates a driver manager that will only ask for the underlying mesos driver when actually
-     * needed.
-     *
-     * @param driverSupplier A factory for the underlying driver.
-     */
-    @Inject
-    DriverImpl(Supplier<Optional<SchedulerDriver>> driverSupplier) {
-      this.driverSupplier = driverSupplier;
-      this.stateMachine =
-          StateMachine.<State>builder("scheduler_driver")
-              .initialState(State.INIT)
-              .addState(State.INIT, State.RUNNING, State.STOPPED)
-              .addState(State.RUNNING, State.STOPPED)
-              .logTransitions()
-              .throwOnBadTransition(true)
-              .build();
-    }
-
-    private synchronized SchedulerDriver get(State expected) {
-      // TODO(William Farner): Formalize the failure case here by throwing a checked exception.
-      stateMachine.checkState(expected);
-      // This will and should fail if the driver is not present.
-      return driverSupplier.get().get();
-    }
-
-    @Override
-    public void launchTask(OfferID offerId, TaskInfo task) {
-      get(State.RUNNING).launchTasks(offerId, ImmutableList.of(task));
-    }
-
-    @Override
-    public void declineOffer(OfferID offerId) {
-      get(State.RUNNING).declineOffer(offerId);
-    }
-
-    @Override
-    public Protos.Status start() {
-      SchedulerDriver driver = get(State.INIT);
-      stateMachine.transition(State.RUNNING);
-      return driver.start();
-    }
-
-    @Override
-    public Status join() {
-      return get(State.RUNNING).join();
-    }
-
-    @Override
-    public synchronized void stop() {
-      if (stateMachine.getState() == State.RUNNING) {
-        SchedulerDriver driver = get(State.RUNNING);
-        driver.stop(true /* failover */);
-        stateMachine.transition(State.STOPPED);
-      }
-    }
-
-    @Override
-    public void killTask(String taskId) {
-      SchedulerDriver driver = get(State.RUNNING);
-      Protos.Status status = driver.killTask(Protos.TaskID.newBuilder().setValue(taskId).build());
-
-      if (status != DRIVER_RUNNING) {
-        LOG.severe(String.format("Attempt to kill task %s failed with code %s",
-            taskId, status));
-        killFailures.incrementAndGet();
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/DriverFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/DriverFactory.java b/src/main/java/com/twitter/aurora/scheduler/DriverFactory.java
deleted file mode 100644
index e39bb09..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/DriverFactory.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Properties;
-import java.util.logging.Logger;
-
-import javax.annotation.Nullable;
-import javax.inject.Inject;
-import javax.inject.Provider;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.protobuf.ByteString;
-
-import org.apache.mesos.MesosSchedulerDriver;
-import org.apache.mesos.Protos.Credential;
-import org.apache.mesos.Protos.FrameworkID;
-import org.apache.mesos.Protos.FrameworkInfo;
-import org.apache.mesos.Scheduler;
-import org.apache.mesos.SchedulerDriver;
-
-import com.twitter.common.args.Arg;
-import com.twitter.common.args.CmdLine;
-import com.twitter.common.args.constraints.NotNull;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-
-/**
- * Factory to create scheduler driver instances.
- */
-public interface DriverFactory extends Function<String, SchedulerDriver> {
-
-  static class DriverFactoryImpl implements DriverFactory {
-    private static final Logger LOG = Logger.getLogger(DriverFactory.class.getName());
-
-    @NotNull
-    @CmdLine(name = "mesos_master_address",
-        help = "Address for the mesos master, can be a socket address or zookeeper path.")
-    private static final Arg<String> MESOS_MASTER_ADDRESS = Arg.create();
-
-    @VisibleForTesting
-    static final String PRINCIPAL_KEY = "aurora_authentication_principal";
-    @VisibleForTesting
-    static final String SECRET_KEY = "aurora_authentication_secret";
-    @CmdLine(name = "framework_authentication_file",
-        help = "Properties file which contains framework credentials to authenticate with Mesos"
-            + "master. Must contain the properties '" + PRINCIPAL_KEY + "' and "
-            + "'" + SECRET_KEY + "'.")
-    private static final Arg<File> FRAMEWORK_AUTHENTICATION_FILE = Arg.create();
-
-    @CmdLine(name = "framework_failover_timeout",
-        help = "Time after which a framework is considered deleted.  SHOULD BE VERY HIGH.")
-    private static final Arg<Amount<Long, Time>> FRAMEWORK_FAILOVER_TIMEOUT =
-        Arg.create(Amount.of(21L, Time.DAYS));
-
-    /**
-     * Require Mesos slaves to have checkpointing enabled. Slaves with checkpointing enabled will
-     * attempt to write checkpoints when required by a task's framework. These checkpoints allow
-     * executors to be reattached rather than killed when a slave is restarted.
-     *
-     * This flag is dangerous! When enabled tasks will not launch on slaves without checkpointing
-     * enabled.
-     *
-     * Behavior is as follows:
-     * (Scheduler -require_slave_checkpoint=true,  Slave --checkpoint=true):
-     *   Tasks will launch.        Checkpoints will be written.
-     * (Scheduler -require_slave_checkpoint=true,   Slave --checkpoint=false):
-     *   Tasks WILL NOT launch.
-     * (Scheduler -require_slave_checkpoint=false,  Slave --checkpoint=true):
-     *   Tasks will launch.        Checkpoints will not be written.
-     * (Scheduler -require_slave_checkpoint=false,  Slave --checkpoint=false):
-     *   Tasks will launch.        Checkpoints will not be written.
-     *
-     * TODO(ksweeney): Remove warning table after https://issues.apache.org/jira/browse/MESOS-444
-     * is resolved.
-     */
-    @CmdLine(name = "require_slave_checkpoint",
-        help = "DANGEROUS! Require Mesos slaves to have checkpointing enabled. When enabled a "
-            + "slave restart should not kill executors, but the scheduler will not be able to "
-            + "launch tasks on slaves without --checkpoint=true in their command lines. See "
-            + "DriverFactory.java for more information.")
-    private static final Arg<Boolean> REQUIRE_SLAVE_CHECKPOINT = Arg.create(false);
-
-    private static final String EXECUTOR_USER = "root";
-
-    private static final String TWITTER_FRAMEWORK_NAME = "TwitterScheduler";
-
-    private final Provider<Scheduler> scheduler;
-
-    @Inject
-    DriverFactoryImpl(Provider<Scheduler> scheduler) {
-      this.scheduler = Preconditions.checkNotNull(scheduler);
-    }
-
-    @VisibleForTesting
-    static Properties parseCredentials(InputStream credentialsStream) {
-      Properties properties = new Properties();
-      try {
-        properties.load(credentialsStream);
-      } catch (IOException e) {
-        LOG.severe("Unable to load authentication file");
-        throw Throwables.propagate(e);
-      }
-      Preconditions.checkState(properties.containsKey(PRINCIPAL_KEY),
-          "The framework authentication file is missing the key: %s", PRINCIPAL_KEY);
-      Preconditions.checkState(properties.containsKey(SECRET_KEY),
-          "The framework authentication file is missing the key: %s", SECRET_KEY);
-      return properties;
-    }
-
-    @Override
-    public SchedulerDriver apply(@Nullable String frameworkId) {
-      LOG.info("Connecting to mesos master: " + MESOS_MASTER_ADDRESS.get());
-
-      FrameworkInfo.Builder frameworkInfo = FrameworkInfo.newBuilder()
-          .setUser(EXECUTOR_USER)
-          .setName(TWITTER_FRAMEWORK_NAME)
-          .setCheckpoint(REQUIRE_SLAVE_CHECKPOINT.get())
-          .setFailoverTimeout(FRAMEWORK_FAILOVER_TIMEOUT.get().as(Time.SECONDS));
-
-      if (frameworkId != null) {
-        LOG.info("Found persisted framework ID: " + frameworkId);
-        frameworkInfo.setId(FrameworkID.newBuilder().setValue(frameworkId));
-      } else {
-        LOG.warning("Did not find a persisted framework ID, connecting as a new framework.");
-      }
-
-      if (FRAMEWORK_AUTHENTICATION_FILE.hasAppliedValue()) {
-        Properties properties;
-        try {
-          properties = parseCredentials(new FileInputStream(FRAMEWORK_AUTHENTICATION_FILE.get()));
-        } catch (FileNotFoundException e) {
-          LOG.severe("Authentication File not Found");
-          throw Throwables.propagate(e);
-        }
-
-        LOG.info(String.format("Connecting to master using authentication (principal: %s).",
-            properties.get(PRINCIPAL_KEY)));
-
-        Credential credential = Credential.newBuilder()
-            .setPrincipal(properties.getProperty(PRINCIPAL_KEY))
-            .setSecret(ByteString.copyFromUtf8(properties.getProperty(SECRET_KEY)))
-            .build();
-
-        return new MesosSchedulerDriver(
-            scheduler.get(),
-            frameworkInfo.build(),
-            MESOS_MASTER_ADDRESS.get(),
-            credential);
-      } else {
-        LOG.warning("Connecting to master without authentication!");
-        return new MesosSchedulerDriver(
-            scheduler.get(),
-            frameworkInfo.build(),
-            MESOS_MASTER_ADDRESS.get());
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/MesosSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/MesosSchedulerImpl.java b/src/main/java/com/twitter/aurora/scheduler/MesosSchedulerImpl.java
deleted file mode 100644
index fb41405..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/MesosSchedulerImpl.java
+++ /dev/null
@@ -1,268 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler;
-
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-
-import org.apache.mesos.Protos.ExecutorID;
-import org.apache.mesos.Protos.FrameworkID;
-import org.apache.mesos.Protos.MasterInfo;
-import org.apache.mesos.Protos.Offer;
-import org.apache.mesos.Protos.OfferID;
-import org.apache.mesos.Protos.SlaveID;
-import org.apache.mesos.Protos.TaskInfo;
-import org.apache.mesos.Protos.TaskStatus;
-import org.apache.mesos.Scheduler;
-import org.apache.mesos.SchedulerDriver;
-
-import com.twitter.aurora.GuiceUtils.AllowUnchecked;
-import com.twitter.aurora.codec.ThriftBinaryCodec;
-import com.twitter.aurora.gen.comm.SchedulerMessage;
-import com.twitter.aurora.scheduler.base.Conversions;
-import com.twitter.aurora.scheduler.base.SchedulerException;
-import com.twitter.aurora.scheduler.configuration.Resources;
-import com.twitter.aurora.scheduler.events.PubsubEvent.Interceptors.Event;
-import com.twitter.aurora.scheduler.events.PubsubEvent.Interceptors.SendNotification;
-import com.twitter.aurora.scheduler.state.SchedulerCore;
-import com.twitter.aurora.scheduler.storage.Storage;
-import com.twitter.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import com.twitter.aurora.scheduler.storage.Storage.MutateWork;
-import com.twitter.common.application.Lifecycle;
-import com.twitter.common.inject.TimedInterceptor.Timed;
-import com.twitter.common.stats.Stats;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Location for communication with mesos.
- */
-class MesosSchedulerImpl implements Scheduler {
-  private static final Logger LOG = Logger.getLogger(MesosSchedulerImpl.class.getName());
-
-  private final AtomicLong resourceOffers = Stats.exportLong("scheduler_resource_offers");
-  private final AtomicLong failedOffers = Stats.exportLong("scheduler_failed_offers");
-  private final AtomicLong failedStatusUpdates = Stats.exportLong("scheduler_status_updates");
-  private final AtomicLong frameworkDisconnects =
-      Stats.exportLong("scheduler_framework_disconnects");
-  private final AtomicLong frameworkReregisters =
-      Stats.exportLong("scheduler_framework_reregisters");
-  private final AtomicLong lostExecutors = Stats.exportLong("scheduler_lost_executors");
-
-  private final List<TaskLauncher> taskLaunchers;
-
-  private final Storage storage;
-  private final SchedulerCore schedulerCore;
-  private final Lifecycle lifecycle;
-  private volatile boolean registered = false;
-
-  /**
-   * Creates a new scheduler.
-   *
-   * @param schedulerCore Core scheduler.
-   * @param lifecycle Application lifecycle manager.
-   * @param taskLaunchers Task launchers.
-   */
-  @Inject
-  public MesosSchedulerImpl(
-      Storage storage,
-      SchedulerCore schedulerCore,
-      final Lifecycle lifecycle,
-      List<TaskLauncher> taskLaunchers) {
-
-    this.storage = checkNotNull(storage);
-    this.schedulerCore = checkNotNull(schedulerCore);
-    this.lifecycle = checkNotNull(lifecycle);
-    this.taskLaunchers = checkNotNull(taskLaunchers);
-  }
-
-  @Override
-  public void slaveLost(SchedulerDriver schedulerDriver, SlaveID slaveId) {
-    LOG.info("Received notification of lost slave: " + slaveId);
-  }
-
-  @SendNotification(after = Event.DriverRegistered)
-  @Override
-  public void registered(
-      SchedulerDriver driver,
-      final FrameworkID frameworkId,
-      MasterInfo masterInfo) {
-
-    LOG.info("Registered with ID " + frameworkId + ", master: " + masterInfo);
-
-    storage.write(new MutateWork.NoResult.Quiet() {
-      @Override protected void execute(MutableStoreProvider storeProvider) {
-        storeProvider.getSchedulerStore().saveFrameworkId(frameworkId.getValue());
-      }
-    });
-    registered = true;
-  }
-
-  @SendNotification(after = Event.DriverDisconnected)
-  @Override
-  public void disconnected(SchedulerDriver schedulerDriver) {
-    LOG.warning("Framework disconnected.");
-    frameworkDisconnects.incrementAndGet();
-  }
-
-  @Override
-  public void reregistered(SchedulerDriver schedulerDriver, MasterInfo masterInfo) {
-    LOG.info("Framework re-registered with master " + masterInfo);
-    frameworkReregisters.incrementAndGet();
-  }
-
-  private static boolean fitsInOffer(TaskInfo task, Offer offer) {
-    return Resources.from(offer).greaterThanOrEqual(Resources.from(task.getResourcesList()));
-  }
-
-  @Timed("scheduler_resource_offers")
-  @Override
-  public void resourceOffers(SchedulerDriver driver, List<Offer> offers) {
-    Preconditions.checkState(registered, "Must be registered before receiving offers.");
-
-    for (final Offer offer : offers) {
-      log(Level.FINE, "Received offer: %s", offer);
-      resourceOffers.incrementAndGet();
-      storage.write(new MutateWork.NoResult.Quiet() {
-        @Override protected void execute(MutableStoreProvider storeProvider) {
-          storeProvider.getAttributeStore().saveHostAttributes(Conversions.getAttributes(offer));
-        }
-      });
-
-      // Ordering of task launchers is important here, since offers are consumed greedily.
-      // TODO(William Farner): Refactor this area of code now that the primary task launcher
-      // is asynchronous.
-      for (TaskLauncher launcher : taskLaunchers) {
-        Optional<TaskInfo> task = Optional.absent();
-        try {
-          task = launcher.createTask(offer);
-        } catch (SchedulerException e) {
-          LOG.log(Level.WARNING, "Failed to schedule offers.", e);
-          failedOffers.incrementAndGet();
-        }
-
-        if (task.isPresent()) {
-          if (fitsInOffer(task.get(), offer)) {
-            driver.launchTasks(offer.getId(), ImmutableList.of(task.get()));
-            break;
-          } else {
-            LOG.warning("Insufficient resources to launch task " + task);
-          }
-        }
-      }
-    }
-  }
-
-  @Override
-  public void offerRescinded(SchedulerDriver schedulerDriver, OfferID offerId) {
-    LOG.info("Offer rescinded: " + offerId);
-    for (TaskLauncher launcher : taskLaunchers) {
-      launcher.cancelOffer(offerId);
-    }
-  }
-
-  @AllowUnchecked
-  @Timed("scheduler_status_update")
-  @Override
-  public void statusUpdate(SchedulerDriver driver, TaskStatus status) {
-    String info = status.hasData() ? status.getData().toStringUtf8() : null;
-    String infoMsg = info != null ? " with info " + info : "";
-    String coreMsg = status.hasMessage() ? " with core message " + status.getMessage() : "";
-    LOG.info("Received status update for task " + status.getTaskId().getValue()
-        + " in state " + status.getState() + infoMsg + coreMsg);
-
-    try {
-      for (TaskLauncher launcher : taskLaunchers) {
-        if (launcher.statusUpdate(status)) {
-          return;
-        }
-      }
-    } catch (SchedulerException e) {
-      LOG.log(Level.SEVERE, "Status update failed due to scheduler exception: " + e, e);
-      // We re-throw the exception here in an effort to NACK the status update and trigger an
-      // abort of the driver.  Previously we directly issued driver.abort(), but the re-entrancy
-      // guarantees of that are uncertain (and we believe it was not working).  However, this
-      // was difficult to discern since logging is unreliable during JVM shutdown and we would not
-      // see the above log message.
-      throw e;
-    }
-
-    LOG.warning("Unhandled status update " + status);
-    failedStatusUpdates.incrementAndGet();
-  }
-
-  @Override
-  public void error(SchedulerDriver driver, String message) {
-    LOG.severe("Received error message: " + message);
-    lifecycle.shutdown();
-  }
-
-  @Override
-  public void executorLost(SchedulerDriver schedulerDriver, ExecutorID executorID, SlaveID slaveID,
-      int status) {
-
-    LOG.info("Lost executor " + executorID);
-    lostExecutors.incrementAndGet();
-  }
-
-  @Timed("scheduler_framework_message")
-  @Override
-  public void frameworkMessage(SchedulerDriver driver, ExecutorID executor, SlaveID slave,
-      byte[] data) {
-
-    if (data == null) {
-      LOG.info("Received empty framework message.");
-      return;
-    }
-
-    try {
-      SchedulerMessage schedulerMsg = ThriftBinaryCodec.decode(SchedulerMessage.class, data);
-      if (schedulerMsg == null || !schedulerMsg.isSet()) {
-        LOG.warning("Received empty scheduler message.");
-        return;
-      }
-
-      switch (schedulerMsg.getSetField()) {
-        case DELETED_TASKS:
-          // TODO(William Farner): Refactor this to use a thinner interface here.  As it stands
-          // it is odd that we route the registered() call to schedulerCore via the
-          // registeredListener and call the schedulerCore directly here.
-          schedulerCore.tasksDeleted(schedulerMsg.getDeletedTasks().getTaskIds());
-          break;
-
-        default:
-          LOG.warning("Received unhandled scheduler message type: " + schedulerMsg.getSetField());
-          break;
-      }
-    } catch (ThriftBinaryCodec.CodingException e) {
-      LOG.log(Level.SEVERE, "Failed to decode framework message.", e);
-    }
-  }
-
-  private static void log(Level level, String message, Object... args) {
-    if (LOG.isLoggable(level)) {
-      LOG.log(level, String.format(message, args));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/MesosTaskFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/MesosTaskFactory.java b/src/main/java/com/twitter/aurora/scheduler/MesosTaskFactory.java
deleted file mode 100644
index 5f73f71..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/MesosTaskFactory.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler;
-
-import java.util.List;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.protobuf.ByteString;
-
-import org.apache.mesos.Protos.ExecutorID;
-import org.apache.mesos.Protos.ExecutorInfo;
-import org.apache.mesos.Protos.Resource;
-import org.apache.mesos.Protos.SlaveID;
-import org.apache.mesos.Protos.TaskID;
-import org.apache.mesos.Protos.TaskInfo;
-
-import com.twitter.aurora.Protobufs;
-import com.twitter.aurora.codec.ThriftBinaryCodec;
-import com.twitter.aurora.scheduler.base.CommandUtil;
-import com.twitter.aurora.scheduler.base.JobKeys;
-import com.twitter.aurora.scheduler.base.SchedulerException;
-import com.twitter.aurora.scheduler.base.Tasks;
-import com.twitter.aurora.scheduler.configuration.Resources;
-import com.twitter.aurora.scheduler.storage.entities.IAssignedTask;
-import com.twitter.aurora.scheduler.storage.entities.IJobKey;
-import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
-import com.twitter.common.quantity.Data;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import static com.twitter.common.base.MorePreconditions.checkNotBlank;
-
-/**
- * A factory to create mesos task objects.
- */
-public interface MesosTaskFactory {
-
-  /**
-   * Creates a mesos task object.
-   *
-   * @param task Assigned task to translate into a task object.
-   * @param slaveId Id of the slave the task is being assigned to.
-   * @return A new task.
-   * @throws SchedulerException If the task could not be encoded.
-   */
-  TaskInfo createFrom(IAssignedTask task, SlaveID slaveId) throws SchedulerException;
-
-  static class ExecutorConfig {
-    private final String executorPath;
-
-    public ExecutorConfig(String executorPath) {
-      this.executorPath = checkNotBlank(executorPath);
-    }
-
-    String getExecutorPath() {
-      return executorPath;
-    }
-  }
-
-  static class MesosTaskFactoryImpl implements MesosTaskFactory {
-    private static final Logger LOG = Logger.getLogger(MesosTaskFactoryImpl.class.getName());
-    private static final String EXECUTOR_PREFIX = "thermos-";
-
-    /**
-     * Name to associate with task executors.
-     */
-    @VisibleForTesting
-    static final String EXECUTOR_NAME = "aurora.task";
-
-    private final String executorPath;
-
-    @Inject
-    MesosTaskFactoryImpl(ExecutorConfig executorConfig) {
-      this.executorPath = executorConfig.getExecutorPath();
-    }
-
-    @VisibleForTesting
-    static ExecutorID getExecutorId(String taskId) {
-      return ExecutorID.newBuilder().setValue(EXECUTOR_PREFIX + taskId).build();
-    }
-
-    public static String getJobSourceName(IJobKey jobkey) {
-      return String.format("%s.%s.%s", jobkey.getRole(), jobkey.getEnvironment(), jobkey.getName());
-    }
-
-    public static String getJobSourceName(ITaskConfig task) {
-      return getJobSourceName(JobKeys.from(task));
-    }
-
-    public static String getInstanceSourceName(ITaskConfig task, int instanceId) {
-      return String.format("%s.%s", getJobSourceName(task), instanceId);
-    }
-
-    @Override
-    public TaskInfo createFrom(IAssignedTask task, SlaveID slaveId) throws SchedulerException {
-      checkNotNull(task);
-      byte[] taskInBytes;
-      try {
-        taskInBytes = ThriftBinaryCodec.encode(task.newBuilder());
-      } catch (ThriftBinaryCodec.CodingException e) {
-        LOG.log(Level.SEVERE, "Unable to serialize task.", e);
-        throw new SchedulerException("Internal error.", e);
-      }
-
-      ITaskConfig config = task.getTask();
-      List<Resource> resources;
-      if (task.isSetAssignedPorts()) {
-        resources = Resources.from(config)
-            .toResourceList(ImmutableSet.copyOf(task.getAssignedPorts().values()));
-      } else {
-        resources = ImmutableList.of();
-      }
-
-      if (LOG.isLoggable(Level.FINE)) {
-        LOG.fine("Setting task resources to "
-            + Iterables.transform(resources, Protobufs.SHORT_TOSTRING));
-      }
-      TaskInfo.Builder taskBuilder =
-          TaskInfo.newBuilder()
-              .setName(JobKeys.toPath(Tasks.ASSIGNED_TO_JOB_KEY.apply(task)))
-              .setTaskId(TaskID.newBuilder().setValue(task.getTaskId()))
-              .setSlaveId(slaveId)
-              .addAllResources(resources)
-              .setData(ByteString.copyFrom(taskInBytes));
-
-      ExecutorInfo executor = ExecutorInfo.newBuilder()
-          .setCommand(CommandUtil.create(executorPath))
-          .setExecutorId(getExecutorId(task.getTaskId()))
-          .setName(EXECUTOR_NAME)
-          .setSource(getInstanceSourceName(config, task.getInstanceId()))
-          .addResources(Resources.makeMesosResource(Resources.CPUS, ResourceSlot.EXECUTOR_CPUS))
-          .addResources(
-              Resources.makeMesosResource(Resources.RAM_MB, ResourceSlot.EXECUTOR_RAM.as(Data.MB)))
-          .build();
-      return taskBuilder
-          .setExecutor(executor)
-          .build();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/ResourceSlot.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/ResourceSlot.java b/src/main/java/com/twitter/aurora/scheduler/ResourceSlot.java
deleted file mode 100644
index a9c14e6..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/ResourceSlot.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler;
-
-import java.util.Arrays;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Ordering;
-
-import com.twitter.aurora.scheduler.configuration.Resources;
-import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Data;
-
-import static org.apache.mesos.Protos.Offer;
-
-/**
- * Resource containing class that is aware of executor overhead.
- */
-public final class ResourceSlot {
-
-  private final Resources resources;
-
-  /**
-   * CPU allocated for each executor.
-   */
-  @VisibleForTesting
-  static final double EXECUTOR_CPUS = 0.25;
-
-  /**
-   * RAM required for the executor.  Executors in the wild have been observed using 48-54MB RSS,
-   * setting to 128MB to be extra vigilant initially.
-   */
-  @VisibleForTesting
-  static final Amount<Long, Data> EXECUTOR_RAM = Amount.of(128L, Data.MB);
-
-  private ResourceSlot(Resources r) {
-    this.resources = r;
-  }
-
-  public static ResourceSlot from(ITaskConfig task) {
-    double totalCPU = task.getNumCpus() + EXECUTOR_CPUS;
-    Amount<Long, Data> totalRAM = Amount.of(task.getRamMb() + EXECUTOR_RAM.as(Data.MB), Data.MB);
-    Amount<Long, Data> disk = Amount.of(task.getDiskMb(), Data.MB);
-    return new ResourceSlot(
-        new Resources(totalCPU, totalRAM, disk, task.getRequestedPorts().size()));
-  }
-
-  public static ResourceSlot from(Offer offer) {
-    return new ResourceSlot(Resources.from(offer));
-  }
-
-  public double getNumCpus() {
-    return resources.getNumCpus();
-  }
-
-  public Amount<Long, Data> getRam() {
-    return resources.getRam();
-  }
-
-  public Amount<Long, Data> getDisk() {
-    return resources.getDisk();
-  }
-
-  public int getNumPorts() {
-    return resources.getNumPorts();
-  }
-
-  @VisibleForTesting
-  public static ResourceSlot from(double cpu,
-                                  Amount<Long, Data> ram,
-                                  Amount<Long, Data> disk,
-                                  int ports) {
-    double totalCPU = cpu + EXECUTOR_CPUS;
-    Amount<Long, Data> totalRAM = Amount.of(ram.as(Data.MB) + EXECUTOR_RAM.as(Data.MB), Data.MB);
-
-    return new ResourceSlot(new Resources(totalCPU, totalRAM, disk, ports));
-  }
-
-  public static ResourceSlot sum(ResourceSlot... rs) {
-    return sum(Arrays.asList(rs));
-  }
-
-  public static ResourceSlot sum(Iterable<ResourceSlot> rs) {
-    Resources r = Resources.sum(Iterables.transform(rs, new Function<ResourceSlot, Resources>() {
-      @Override public Resources apply(ResourceSlot input) {
-        return input.resources;
-      }
-    }));
-
-    return new ResourceSlot(r);
-  }
-
-  public static final Ordering<ResourceSlot> ORDER = new Ordering<ResourceSlot>() {
-    @Override public int compare(ResourceSlot left, ResourceSlot right) {
-      return Resources.RESOURCE_ORDER.compare(left.resources, right.resources);
-    }
-  };
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/SchedulerLifecycle.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/SchedulerLifecycle.java b/src/main/java/com/twitter/aurora/scheduler/SchedulerLifecycle.java
deleted file mode 100644
index f90869d..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/SchedulerLifecycle.java
+++ /dev/null
@@ -1,417 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler;
-
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.annotation.Nullable;
-import javax.inject.Inject;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
-import com.google.common.base.Supplier;
-import com.google.common.eventbus.Subscribe;
-import com.google.common.util.concurrent.Atomics;
-
-import org.apache.mesos.Protos;
-import org.apache.mesos.SchedulerDriver;
-
-import com.twitter.aurora.scheduler.events.PubsubEvent.DriverRegistered;
-import com.twitter.aurora.scheduler.events.PubsubEvent.EventSubscriber;
-import com.twitter.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import com.twitter.aurora.scheduler.storage.Storage.MutateWork;
-import com.twitter.aurora.scheduler.storage.Storage.NonVolatileStorage;
-import com.twitter.aurora.scheduler.storage.Storage.StoreProvider;
-import com.twitter.aurora.scheduler.storage.Storage.Work;
-import com.twitter.aurora.scheduler.storage.StorageBackfill;
-import com.twitter.common.application.Lifecycle;
-import com.twitter.common.base.Closure;
-import com.twitter.common.base.Closures;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.stats.StatImpl;
-import com.twitter.common.stats.Stats;
-import com.twitter.common.util.Clock;
-import com.twitter.common.util.StateMachine;
-import com.twitter.common.util.StateMachine.Transition;
-import com.twitter.common.zookeeper.Group.JoinException;
-import com.twitter.common.zookeeper.ServerSet;
-import com.twitter.common.zookeeper.SingletonService.LeaderControl;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import static com.twitter.common.zookeeper.SingletonService.LeadershipListener;
-
-/**
- * The central driver of the scheduler runtime lifecycle.  Handles the transitions from startup and
- * initialization through acting as a standby scheduler / log replica and finally to becoming the
- * scheduler leader.
- * <p>
- * The (enforced) call order to be used with this class is:
- * <ol>
- *   <li>{@link #prepare()}, to initialize the storage system.</li>
- *   <li>{@link LeadershipListener#onLeading(LeaderControl) onLeading()} on the
- *       {@link LeadershipListener LeadershipListener}
- *       returned from {@link #prepare()}, signaling that this process has exclusive control of the
- *       cluster.</li>
- *   <li>{@link #registered(DriverRegistered) registered()},
- *       indicating that registration with the mesos master has succeeded.
- *       At this point, the scheduler's presence will be announced via
- *       {@link LeaderControl#advertise() advertise()}.</li>
- * </ol>
- * If this call order is broken, calls will fail by throwing
- * {@link java.lang.IllegalStateException}.
- * <p>
- * At any point in the lifecycle, the scheduler will respond to
- * {@link LeadershipListener#onDefeated(com.twitter.common.zookeeper.ServerSet.EndpointStatus)
- * onDefeated()} by initiating a clean shutdown using {@link Lifecycle#shutdown() shutdown()}.
- * A clean shutdown will also be initiated if control actions fail during normal state transitions.
- */
-public class SchedulerLifecycle implements EventSubscriber {
-
-  private static final Logger LOG = Logger.getLogger(SchedulerLifecycle.class.getName());
-
-  private enum State {
-    IDLE,
-    PREPARING_STORAGE,
-    STORAGE_PREPARED,
-    LEADER_AWAITING_REGISTRATION,
-    REGISTERED_LEADER,
-    RUNNING,
-    DEAD
-  }
-
-  private static final Predicate<Transition<State>> IS_DEAD = new Predicate<Transition<State>>() {
-    @Override public boolean apply(Transition<State> state) {
-      return state.getTo() == State.DEAD;
-    }
-  };
-
-  private static final Predicate<Transition<State>> NOT_DEAD = Predicates.not(IS_DEAD);
-
-  private final LeadershipListener leadershipListener;
-  private final AtomicBoolean registrationAcked = new AtomicBoolean(false);
-  private final AtomicReference<LeaderControl> leaderControl = Atomics.newReference();
-  private final StateMachine<State> stateMachine;
-
-  @Inject
-  SchedulerLifecycle(
-      final DriverFactory driverFactory,
-      final NonVolatileStorage storage,
-      final Lifecycle lifecycle,
-      final Driver driver,
-      final DriverReference driverRef,
-      final LeadingOptions leadingOptions,
-      final ScheduledExecutorService executorService,
-      final Clock clock) {
-
-    this(
-        driverFactory,
-        storage,
-        lifecycle,
-        driver,
-        driverRef,
-        new DelayedActions() {
-          @Override public void blockingDriverJoin(Runnable runnable) {
-            executorService.execute(runnable);
-          }
-
-          @Override public void onAutoFailover(Runnable runnable) {
-            executorService.schedule(
-                runnable,
-                leadingOptions.leadingTimeLimit.getValue(),
-                leadingOptions.leadingTimeLimit.getUnit().getTimeUnit());
-          }
-
-          @Override public void onRegistrationTimeout(Runnable runnable) {
-            LOG.info(
-                "Giving up on registration in " + leadingOptions.registrationDelayLimit);
-            executorService.schedule(
-                runnable,
-                leadingOptions.registrationDelayLimit.getValue(),
-                leadingOptions.registrationDelayLimit.getUnit().getTimeUnit());
-          }
-        },
-        clock);
-  }
-
-  @VisibleForTesting
-  SchedulerLifecycle(
-      final DriverFactory driverFactory,
-      final NonVolatileStorage storage,
-      final Lifecycle lifecycle,
-      // TODO(wfarner): The presence of Driver and DriverReference is quite confusing.  Figure out
-      //                a clean way to collapse the duties of DriverReference into DriverImpl.
-      final Driver driver,
-      final DriverReference driverRef,
-      final DelayedActions delayedActions,
-      final Clock clock) {
-
-    Stats.export(new StatImpl<Integer>("framework_registered") {
-      @Override public Integer read() {
-        return registrationAcked.get() ? 1 : 0;
-      }
-    });
-    for (final State state : State.values()) {
-      Stats.export(new StatImpl<Integer>("scheduler_lifecycle_" + state) {
-        @Override public Integer read() {
-          return (state == stateMachine.getState()) ? 1 : 0;
-        }
-      });
-    }
-
-    final Closure<Transition<State>> prepareStorage = new Closure<Transition<State>>() {
-      @Override public void execute(Transition<State> transition) {
-        try {
-          storage.prepare();
-          stateMachine.transition(State.STORAGE_PREPARED);
-        } catch (RuntimeException e) {
-          stateMachine.transition(State.DEAD);
-          throw e;
-        }
-      }
-    };
-
-    final Closure<Transition<State>> handleLeading = new Closure<Transition<State>>() {
-      @Override public void execute(Transition<State> transition) {
-        LOG.info("Elected as leading scheduler!");
-        storage.start(new MutateWork.NoResult.Quiet() {
-          @Override protected void execute(MutableStoreProvider storeProvider) {
-            StorageBackfill.backfill(storeProvider, clock);
-          }
-        });
-
-        @Nullable final String frameworkId = storage.consistentRead(
-            new Work.Quiet<String>() {
-              @Override public String apply(StoreProvider storeProvider) {
-                return storeProvider.getSchedulerStore().fetchFrameworkId();
-              }
-            });
-        driverRef.set(driverFactory.apply(frameworkId));
-
-        delayedActions.onRegistrationTimeout(
-            new Runnable() {
-              @Override public void run() {
-                if (!registrationAcked.get()) {
-                  LOG.severe(
-                      "Framework has not been registered within the tolerated delay.");
-                  stateMachine.transition(State.DEAD);
-                }
-              }
-            });
-
-        delayedActions.onAutoFailover(
-            new Runnable() {
-              @Override public void run() {
-                LOG.info("Triggering automatic failover.");
-                stateMachine.transition(State.DEAD);
-              }
-            });
-
-        Protos.Status status = driver.start();
-        LOG.info("Driver started with code " + status);
-        delayedActions.blockingDriverJoin(new Runnable() {
-          @Override public void run() {
-            // Blocks until driver exits.
-            driver.join();
-            stateMachine.transition(State.DEAD);
-          }
-        });
-      }
-    };
-
-    final Closure<Transition<State>> handleRegistered = new Closure<Transition<State>>() {
-      @Override public void execute(Transition<State> transition) {
-        registrationAcked.set(true);
-        try {
-          leaderControl.get().advertise();
-        } catch (JoinException e) {
-          LOG.log(Level.SEVERE, "Failed to advertise leader, shutting down.", e);
-          stateMachine.transition(State.DEAD);
-        } catch (InterruptedException e) {
-          LOG.log(Level.SEVERE, "Interrupted while advertising leader, shutting down.", e);
-          stateMachine.transition(State.DEAD);
-          Thread.currentThread().interrupt();
-        }
-      }
-    };
-
-    final Closure<Transition<State>> shutDown = new Closure<Transition<State>>() {
-      private final AtomicBoolean invoked = new AtomicBoolean(false);
-      @Override public void execute(Transition<State> transition) {
-        if (!invoked.compareAndSet(false, true)) {
-          LOG.info("Shutdown already invoked, ignoring extra call.");
-          return;
-        }
-
-        // TODO(wfarner): Consider using something like guava's Closer to abstractly tear down
-        // resources here.
-        try {
-          LeaderControl control = leaderControl.get();
-          if (control != null) {
-            try {
-              control.leave();
-            } catch (JoinException e) {
-              LOG.log(Level.WARNING, "Failed to leave leadership: " + e, e);
-            } catch (ServerSet.UpdateException e) {
-              LOG.log(Level.WARNING, "Failed to leave server set: " + e, e);
-            }
-          }
-
-          // TODO(wfarner): Re-evaluate tear-down ordering here.  Should the top-level shutdown
-          // be invoked first, or the underlying critical components?
-          driver.stop();
-          storage.stop();
-        } finally {
-          lifecycle.shutdown();
-        }
-      }
-    };
-
-    stateMachine = StateMachine.<State>builder("SchedulerLifecycle")
-        .initialState(State.IDLE)
-        .logTransitions()
-        .addState(
-            Closures.filter(NOT_DEAD, prepareStorage),
-            State.IDLE,
-            State.PREPARING_STORAGE, State.DEAD)
-        .addState(
-            State.PREPARING_STORAGE,
-            State.STORAGE_PREPARED, State.DEAD)
-        .addState(
-            Closures.filter(NOT_DEAD, handleLeading),
-            State.STORAGE_PREPARED,
-            State.LEADER_AWAITING_REGISTRATION, State.DEAD)
-        .addState(
-            Closures.filter(NOT_DEAD, handleRegistered),
-            State.LEADER_AWAITING_REGISTRATION,
-            State.REGISTERED_LEADER, State.DEAD)
-        .addState(
-            State.REGISTERED_LEADER,
-            State.RUNNING, State.DEAD)
-        .addState(
-            State.RUNNING,
-            State.DEAD)
-        .addState(
-            State.DEAD,
-            // Allow cycles in DEAD to prevent throwing and avoid the need for call-site checking.
-            State.DEAD
-        )
-        .onAnyTransition(
-            Closures.filter(IS_DEAD, shutDown))
-        .build();
-
-    this.leadershipListener = new SchedulerCandidateImpl(stateMachine, leaderControl);
-  }
-
-  /**
-   * Prepares a scheduler to offer itself as a leader candidate.  After this call the scheduler will
-   * host a live log replica and start syncing data from the leader via the log until it gets called
-   * upon to lead.
-   *
-   * @return A listener that can be offered for leadership of a distributed election.
-   */
-  public LeadershipListener prepare() {
-    stateMachine.transition(State.PREPARING_STORAGE);
-    return leadershipListener;
-  }
-
-  @Subscribe
-  public void registered(DriverRegistered event) {
-    stateMachine.transition(State.REGISTERED_LEADER);
-  }
-
-  /**
-   * Maintains a reference to the driver.
-   */
-  static class DriverReference implements Supplier<Optional<SchedulerDriver>> {
-    private final AtomicReference<SchedulerDriver> driver = Atomics.newReference();
-
-    @Override public Optional<SchedulerDriver> get() {
-      return Optional.fromNullable(driver.get());
-    }
-
-    private void set(SchedulerDriver ref) {
-      driver.set(ref);
-    }
-  }
-
-  private static class SchedulerCandidateImpl implements LeadershipListener {
-    private final StateMachine<State> stateMachine;
-    private final AtomicReference<LeaderControl> leaderControl;
-
-    SchedulerCandidateImpl(
-        StateMachine<State> stateMachine,
-        AtomicReference<LeaderControl> leaderControl) {
-
-      this.stateMachine = stateMachine;
-      this.leaderControl = leaderControl;
-    }
-
-    @Override public void onLeading(LeaderControl control) {
-      leaderControl.set(control);
-      stateMachine.transition(State.LEADER_AWAITING_REGISTRATION);
-    }
-
-    @Override public void onDefeated(@Nullable ServerSet.EndpointStatus status) {
-      LOG.severe("Lost leadership, committing suicide.");
-      stateMachine.transition(State.DEAD);
-    }
-  }
-
-  public static class LeadingOptions {
-    private final Amount<Long, Time> registrationDelayLimit;
-    private final Amount<Long, Time> leadingTimeLimit;
-
-    /**
-     * Creates a new collection of options for tuning leadership behavior.
-     *
-     * @param registrationDelayLimit Maximum amount of time to wait for framework registration to
-     *                               complete.
-     * @param leadingTimeLimit Maximum amount of time to serve as leader before abdicating.
-     */
-    public LeadingOptions(
-        Amount<Long, Time> registrationDelayLimit,
-        Amount<Long, Time> leadingTimeLimit) {
-
-      Preconditions.checkArgument(
-          registrationDelayLimit.getValue() >= 0,
-          "Registration delay limit must be positive.");
-      Preconditions.checkArgument(
-          leadingTimeLimit.getValue() >= 0,
-          "Leading time limit must be positive.");
-
-      this.registrationDelayLimit = checkNotNull(registrationDelayLimit);
-      this.leadingTimeLimit = checkNotNull(leadingTimeLimit);
-    }
-  }
-
-  @VisibleForTesting
-  interface DelayedActions {
-    void blockingDriverJoin(Runnable runnable);
-
-    void onAutoFailover(Runnable runnable);
-
-    void onRegistrationTimeout(Runnable runnable);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/SchedulerModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/SchedulerModule.java b/src/main/java/com/twitter/aurora/scheduler/SchedulerModule.java
deleted file mode 100644
index be4c2b1..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/SchedulerModule.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler;
-
-import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-
-import javax.inject.Singleton;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Supplier;
-import com.google.common.collect.ImmutableList;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.inject.AbstractModule;
-import com.google.inject.PrivateModule;
-import com.google.inject.Provides;
-import com.google.inject.TypeLiteral;
-
-import org.apache.mesos.Scheduler;
-import org.apache.mesos.SchedulerDriver;
-
-import com.twitter.aurora.scheduler.Driver.DriverImpl;
-import com.twitter.aurora.scheduler.SchedulerLifecycle.DriverReference;
-import com.twitter.aurora.scheduler.SchedulerLifecycle.LeadingOptions;
-import com.twitter.aurora.scheduler.TaskIdGenerator.TaskIdGeneratorImpl;
-import com.twitter.aurora.scheduler.events.PubsubEventModule;
-import com.twitter.aurora.scheduler.periodic.GcExecutorLauncher;
-import com.twitter.aurora.scheduler.periodic.GcExecutorLauncher.GcExecutorSettings;
-import com.twitter.common.args.Arg;
-import com.twitter.common.args.CmdLine;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-
-/**
- * Binding module for top-level scheduling logic.
- */
-public class SchedulerModule extends AbstractModule {
-
-  @CmdLine(name = "executor_gc_interval",
-      help = "Max interval on which to run the GC executor on a host to clean up dead tasks.")
-  private static final Arg<Amount<Long, Time>> EXECUTOR_GC_INTERVAL =
-      Arg.create(Amount.of(1L, Time.HOURS));
-
-  @CmdLine(name = "gc_executor_path", help = "Path to the gc executor launch script.")
-  private static final Arg<String> GC_EXECUTOR_PATH = Arg.create(null);
-
-  @CmdLine(name = "max_registration_delay",
-      help = "Max allowable delay to allow the driver to register before aborting")
-  private static final Arg<Amount<Long, Time>> MAX_REGISTRATION_DELAY =
-      Arg.create(Amount.of(1L, Time.MINUTES));
-
-  @CmdLine(name = "max_leading_duration",
-      help = "After leading for this duration, the scheduler should commit suicide.")
-  private static final Arg<Amount<Long, Time>> MAX_LEADING_DURATION =
-      Arg.create(Amount.of(1L, Time.DAYS));
-
-  @Override
-  protected void configure() {
-    bind(Driver.class).to(DriverImpl.class);
-    bind(DriverImpl.class).in(Singleton.class);
-    bind(new TypeLiteral<Supplier<Optional<SchedulerDriver>>>() { }).to(DriverReference.class);
-    bind(DriverReference.class).in(Singleton.class);
-
-    bind(Scheduler.class).to(MesosSchedulerImpl.class);
-    bind(MesosSchedulerImpl.class).in(Singleton.class);
-
-    bind(TaskIdGenerator.class).to(TaskIdGeneratorImpl.class);
-
-    bind(GcExecutorSettings.class).toInstance(new GcExecutorSettings(
-        EXECUTOR_GC_INTERVAL.get(),
-        Optional.fromNullable(GC_EXECUTOR_PATH.get())));
-
-    bind(GcExecutorLauncher.class).in(Singleton.class);
-    bind(UserTaskLauncher.class).in(Singleton.class);
-
-    install(new PrivateModule() {
-      @Override protected void configure() {
-        bind(LeadingOptions.class).toInstance(
-            new LeadingOptions(MAX_REGISTRATION_DELAY.get(), MAX_LEADING_DURATION.get()));
-          final ScheduledExecutorService executor = Executors.newScheduledThreadPool(
-              1,
-              new ThreadFactoryBuilder().setNameFormat("Lifecycle-%d").setDaemon(true).build());
-        bind(ScheduledExecutorService.class).toInstance(executor);
-        bind(SchedulerLifecycle.class).in(Singleton.class);
-        expose(SchedulerLifecycle.class);
-      }
-    });
-
-    PubsubEventModule.bindSubscriber(binder(), SchedulerLifecycle.class);
-    PubsubEventModule.bindSubscriber(binder(), TaskVars.class);
-  }
-
-  @Provides
-  @Singleton
-  List<TaskLauncher> provideTaskLaunchers(
-      GcExecutorLauncher gcLauncher,
-      UserTaskLauncher userTaskLauncher) {
-
-    return ImmutableList.of(gcLauncher, userTaskLauncher);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/TaskIdGenerator.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/TaskIdGenerator.java b/src/main/java/com/twitter/aurora/scheduler/TaskIdGenerator.java
deleted file mode 100644
index 240649e..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/TaskIdGenerator.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler;
-
-import java.util.UUID;
-
-import javax.inject.Inject;
-
-import com.google.common.base.Preconditions;
-
-import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
-import com.twitter.common.util.Clock;
-
-/**
- * A function that generates universally-unique (not guaranteed, but highly confident) task IDs.
- */
-public interface TaskIdGenerator {
-
-  /**
-   * Generates a universally-unique ID for the task.  This is not necessarily a repeatable
-   * operation, two subsequent invocations with the same object need not return the same value.
-   *
-   * @param task Configuration of the task to create an ID for.
-   * @param instanceId Instance ID for the task.
-   * @return A universally-unique ID for the task.
-   */
-  String generate(ITaskConfig task, int instanceId);
-
-  class TaskIdGeneratorImpl implements TaskIdGenerator {
-    private final Clock clock;
-
-    @Inject
-    TaskIdGeneratorImpl(Clock clock) {
-      this.clock = Preconditions.checkNotNull(clock);
-    }
-
-    @Override
-    public String generate(ITaskConfig task, int instanceId) {
-      String sep = "-";
-      return new StringBuilder()
-          .append(clock.nowMillis())               // Allows chronological sorting.
-          .append(sep)
-          .append(task.getOwner().getRole())       // Identification and collision prevention.
-          .append(sep)
-          .append(task.getEnvironment())
-          .append(sep)
-          .append(task.getJobName())
-          .append(sep)
-          .append(instanceId)                      // Collision prevention within job.
-          .append(sep)
-          .append(UUID.randomUUID())               // Just-in-case collision prevention.
-          .toString().replaceAll("[^\\w-]", sep);  // Constrain character set.
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/TaskLauncher.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/TaskLauncher.java b/src/main/java/com/twitter/aurora/scheduler/TaskLauncher.java
deleted file mode 100644
index aade6da..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/TaskLauncher.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler;
-
-import com.google.common.base.Optional;
-
-import org.apache.mesos.Protos.Offer;
-import org.apache.mesos.Protos.OfferID;
-import org.apache.mesos.Protos.TaskInfo;
-import org.apache.mesos.Protos.TaskStatus;
-
-/**
- * A receiver of resource offers and task status updates.
- */
-public interface TaskLauncher {
-
-  /**
-   * Grants a resource offer to the task launcher, which will be passed to any subsequent task
-   * launchers if this one does not accept.
-   * <p>
-   * A task launcher may choose to retain an offer for later use.  Any retained offers must be
-   * cleaned up with {@link #cancelOffer(OfferID)}.
-   *
-   * @param offer The resource offer.
-   * @return A task, absent if the launcher chooses not to accept the offer.
-   */
-  Optional<TaskInfo> createTask(Offer offer);
-
-  /**
-   * Informs the launcher that a status update has been received for a task.  If the task is not
-   * associated with the launcher, it should return {@code false} so that another launcher may
-   * receive it.
-   *
-   * @param status The status update.
-   * @return {@code true} if the status is relevant to the launcher and should not be delivered to
-   * other launchers, {@code false} otherwise.
-   */
-  boolean statusUpdate(TaskStatus status);
-
-  /**
-   * Informs the launcher that a previously-advertised offer is canceled and may not be used.
-   *
-   * @param offer The canceled offer.
-   */
-  void cancelOffer(OfferID offer);
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/TaskVars.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/TaskVars.java b/src/main/java/com/twitter/aurora/scheduler/TaskVars.java
deleted file mode 100644
index 5574631..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/TaskVars.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler;
-
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Predicate;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.Iterables;
-import com.google.common.eventbus.Subscribe;
-
-import com.twitter.aurora.gen.Attribute;
-import com.twitter.aurora.gen.ScheduleStatus;
-import com.twitter.aurora.scheduler.base.Query;
-import com.twitter.aurora.scheduler.events.PubsubEvent.EventSubscriber;
-import com.twitter.aurora.scheduler.events.PubsubEvent.StorageStarted;
-import com.twitter.aurora.scheduler.events.PubsubEvent.TaskStateChange;
-import com.twitter.aurora.scheduler.events.PubsubEvent.TasksDeleted;
-import com.twitter.aurora.scheduler.storage.AttributeStore;
-import com.twitter.aurora.scheduler.storage.Storage;
-import com.twitter.aurora.scheduler.storage.Storage.StoreProvider;
-import com.twitter.aurora.scheduler.storage.Storage.Work;
-import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
-import com.twitter.common.stats.StatsProvider;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * A container that tracks and exports stat counters for tasks.
- */
-class TaskVars implements EventSubscriber {
-  private static final Logger LOG = Logger.getLogger(TaskVars.class.getName());
-
-  // Used to ignore pubsub events sent before storage has completely started.  This avoids a
-  // miscount where a StorageStarted consumer is invoked before storageStarted is invoked here,
-  // and pubsub events are fired for tasks that we have not yet counted.  For example, if
-  // tasksDeleted is invoked, we would end up with a negative count.
-  private volatile boolean storageStarted = false;
-
-  private final LoadingCache<String, AtomicLong> countersByStatus;
-  private final LoadingCache<String, AtomicLong> countersByRack;
-
-  private final Storage storage;
-
-  @Inject
-  TaskVars(Storage storage, final StatsProvider statProvider) {
-    this.storage = checkNotNull(storage);
-    checkNotNull(statProvider);
-    countersByStatus = CacheBuilder.newBuilder().build(new CacheLoader<String, AtomicLong>() {
-      @Override public AtomicLong load(String statName) {
-        return statProvider.makeCounter(statName);
-      }
-    });
-    countersByRack = CacheBuilder.newBuilder().build(new CacheLoader<String, AtomicLong>() {
-      @Override public AtomicLong load(String rack) {
-        return statProvider.makeCounter(rackStatName(rack));
-      }
-    });
-  }
-
-  @VisibleForTesting
-  static String getVarName(ScheduleStatus status) {
-    return "task_store_" + status;
-  }
-
-  @VisibleForTesting
-  static String rackStatName(String rack) {
-    return "tasks_lost_rack_" + rack;
-  }
-
-  private static final Predicate<Attribute> IS_RACK = new Predicate<Attribute>() {
-    @Override public boolean apply(Attribute attr) {
-      return "rack".equals(attr.getName());
-    }
-  };
-
-  private static final Function<Attribute, String> ATTR_VALUE = new Function<Attribute, String>() {
-    @Override public String apply(Attribute attr) {
-      return Iterables.getOnlyElement(attr.getValues());
-    }
-  };
-
-  private AtomicLong getCounter(ScheduleStatus status) {
-    return countersByStatus.getUnchecked(getVarName(status));
-  }
-
-  private void incrementCount(ScheduleStatus status) {
-    getCounter(status).incrementAndGet();
-  }
-
-  private void decrementCount(ScheduleStatus status) {
-    getCounter(status).decrementAndGet();
-  }
-
-  @Subscribe
-  public void taskChangedState(TaskStateChange stateChange) {
-    if (!storageStarted) {
-      return;
-    }
-
-    IScheduledTask task = stateChange.getTask();
-    if (stateChange.getOldState() != ScheduleStatus.INIT) {
-      decrementCount(stateChange.getOldState());
-    }
-    incrementCount(task.getStatus());
-
-    if (stateChange.getNewState() == ScheduleStatus.LOST) {
-      final String host = stateChange.getTask().getAssignedTask().getSlaveHost();
-      Optional<String> rack = storage.consistentRead(new Work.Quiet<Optional<String>>() {
-        @Override public Optional<String> apply(StoreProvider storeProvider) {
-          Optional<Attribute> rack = FluentIterable
-              .from(AttributeStore.Util.attributesOrNone(storeProvider, host))
-              .firstMatch(IS_RACK);
-          return rack.transform(ATTR_VALUE);
-        }
-      });
-
-      if (rack.isPresent()) {
-        countersByRack.getUnchecked(rack.get()).incrementAndGet();
-      } else {
-        LOG.warning("Failed to find rack attribute associated with host " + host);
-      }
-    }
-  }
-
-  @Subscribe
-  public void storageStarted(StorageStarted event) {
-    for (IScheduledTask task : Storage.Util.consistentFetchTasks(storage, Query.unscoped())) {
-      incrementCount(task.getStatus());
-    }
-
-    // Dummy read the counter for each status counter. This is important to guarantee a stat with
-    // value zero is present for each state, even if all states are not represented in the task
-    // store.
-    for (ScheduleStatus status : ScheduleStatus.values()) {
-      getCounter(status);
-    }
-    storageStarted = true;
-  }
-
-  @Subscribe
-  public void tasksDeleted(final TasksDeleted event) {
-    if (!storageStarted) {
-      return;
-    }
-
-    for (IScheduledTask task : event.getTasks()) {
-      decrementCount(task.getStatus());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/UserTaskLauncher.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/UserTaskLauncher.java b/src/main/java/com/twitter/aurora/scheduler/UserTaskLauncher.java
deleted file mode 100644
index 0fb3bbb..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/UserTaskLauncher.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler;
-
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.annotation.Nullable;
-import javax.inject.Inject;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-
-import org.apache.mesos.Protos.Offer;
-import org.apache.mesos.Protos.OfferID;
-import org.apache.mesos.Protos.TaskInfo;
-import org.apache.mesos.Protos.TaskStatus;
-
-import com.twitter.aurora.gen.ScheduleStatus;
-import com.twitter.aurora.scheduler.async.OfferQueue;
-import com.twitter.aurora.scheduler.base.Conversions;
-import com.twitter.aurora.scheduler.base.Query;
-import com.twitter.aurora.scheduler.base.SchedulerException;
-import com.twitter.aurora.scheduler.state.StateManager;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * A task launcher that matches resource offers against user tasks.
- */
-class UserTaskLauncher implements TaskLauncher {
-
-  private static final Logger LOG = Logger.getLogger(UserTaskLauncher.class.getName());
-
-  @VisibleForTesting
-  static final String MEMORY_LIMIT_EXCEEDED = "MEMORY STATISTICS";
-
-  @VisibleForTesting
-  static final String MEMORY_LIMIT_DISPLAY = "Task used more memory than requested.";
-
-  private final OfferQueue offerQueue;
-  private final StateManager stateManager;
-
-  @Inject
-  UserTaskLauncher(OfferQueue offerQueue, StateManager stateManager) {
-    this.offerQueue = checkNotNull(offerQueue);
-    this.stateManager = checkNotNull(stateManager);
-  }
-
-  @Override
-  public Optional<TaskInfo> createTask(Offer offer) {
-    checkNotNull(offer);
-
-    offerQueue.addOffer(offer);
-    return Optional.absent();
-  }
-
-  @Override
-  public synchronized boolean statusUpdate(TaskStatus status) {
-    @Nullable String message = null;
-    if (status.hasMessage()) {
-      message = status.getMessage();
-    }
-
-    try {
-      ScheduleStatus translatedState = Conversions.convertProtoState(status.getState());
-      // TODO(William Farner): Remove this hack once Mesos API change is done.
-      //                       Tracked by: https://issues.apache.org/jira/browse/MESOS-343
-      if ((translatedState == ScheduleStatus.FAILED)
-          && (message != null)
-          && (message.contains(MEMORY_LIMIT_EXCEEDED))) {
-        message = MEMORY_LIMIT_DISPLAY;
-      }
-
-      stateManager.changeState(
-          Query.taskScoped(status.getTaskId().getValue()),
-          translatedState,
-          Optional.fromNullable(message));
-    } catch (SchedulerException e) {
-      LOG.log(Level.WARNING, "Failed to update status for: " + status, e);
-      throw e;
-    }
-    return true;
-  }
-
-  @Override
-  public void cancelOffer(OfferID offer) {
-    offerQueue.cancelOffer(offer);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/app/AppModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/app/AppModule.java b/src/main/java/com/twitter/aurora/scheduler/app/AppModule.java
deleted file mode 100644
index 24702b0..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/app/AppModule.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.app;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-import javax.inject.Singleton;
-
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.collect.Iterables;
-import com.google.inject.AbstractModule;
-import com.google.inject.Key;
-import com.google.inject.Provides;
-
-import org.apache.mesos.Scheduler;
-import org.apache.zookeeper.data.ACL;
-
-import com.twitter.aurora.GuiceUtils;
-import com.twitter.aurora.scheduler.SchedulerModule;
-import com.twitter.aurora.scheduler.async.AsyncModule;
-import com.twitter.aurora.scheduler.events.PubsubEventModule;
-import com.twitter.aurora.scheduler.filter.SchedulingFilterImpl;
-import com.twitter.aurora.scheduler.http.ClusterName;
-import com.twitter.aurora.scheduler.http.ServletModule;
-import com.twitter.aurora.scheduler.metadata.MetadataModule;
-import com.twitter.aurora.scheduler.quota.QuotaModule;
-import com.twitter.aurora.scheduler.state.StateModule;
-import com.twitter.aurora.scheduler.stats.AsyncStatsModule;
-import com.twitter.common.application.ShutdownRegistry;
-import com.twitter.common.application.modules.LifecycleModule;
-import com.twitter.common.base.Command;
-import com.twitter.common.inject.TimedInterceptor;
-import com.twitter.common.net.pool.DynamicHostSet;
-import com.twitter.common.stats.Stats;
-import com.twitter.common.stats.StatsProvider;
-import com.twitter.common.util.Clock;
-import com.twitter.common.zookeeper.ServerSet;
-import com.twitter.common.zookeeper.ServerSetImpl;
-import com.twitter.common.zookeeper.SingletonService;
-import com.twitter.common.zookeeper.ZooKeeperClient;
-import com.twitter.common.zookeeper.ZooKeeperClient.Credentials;
-import com.twitter.common.zookeeper.ZooKeeperUtils;
-import com.twitter.common.zookeeper.guice.client.ZooKeeperClientModule.ClientConfig;
-import com.twitter.thrift.ServiceInstance;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import static com.twitter.common.base.MorePreconditions.checkNotBlank;
-
-/**
- * Binding module for the aurora scheduler application.
- */
-class AppModule extends AbstractModule {
-  private static final Logger LOG = Logger.getLogger(AppModule.class.getName());
-
-  private final String clusterName;
-  private final String serverSetPath;
-  private final ClientConfig zkClientConfig;
-
-  AppModule(String clusterName, String serverSetPath, ClientConfig zkClientConfig) {
-    this.clusterName = checkNotBlank(clusterName);
-    this.serverSetPath = checkNotBlank(serverSetPath);
-    this.zkClientConfig = checkNotNull(zkClientConfig);
-  }
-
-  @Override
-  protected void configure() {
-    // Enable intercepted method timings and context classloader repair.
-    TimedInterceptor.bind(binder());
-    GuiceUtils.bindJNIContextClassLoader(binder(), Scheduler.class);
-    GuiceUtils.bindExceptionTrap(binder(), Scheduler.class);
-
-    bind(Clock.class).toInstance(Clock.SYSTEM_CLOCK);
-
-    bind(Key.get(String.class, ClusterName.class)).toInstance(clusterName);
-
-    // Filter layering: notifier filter -> base impl
-    PubsubEventModule.bind(binder(), SchedulingFilterImpl.class);
-    bind(SchedulingFilterImpl.class).in(Singleton.class);
-
-    LifecycleModule.bindStartupAction(binder(), RegisterShutdownStackPrinter.class);
-
-    install(new AsyncModule());
-    install(new AsyncStatsModule());
-    install(new MetadataModule());
-    install(new QuotaModule());
-    install(new ServletModule());
-    install(new SchedulerModule());
-    install(new StateModule());
-
-    bind(StatsProvider.class).toInstance(Stats.STATS_PROVIDER);
-  }
-
-  /**
-   * Command to register a thread stack printer that identifies initiator of a shutdown.
-   */
-  private static class RegisterShutdownStackPrinter implements Command {
-    private static final Function<StackTraceElement, String> STACK_ELEM_TOSTRING =
-        new Function<StackTraceElement, String>() {
-          @Override public String apply(StackTraceElement element) {
-            return element.getClassName() + "." + element.getMethodName()
-                + String.format("(%s:%s)", element.getFileName(), element.getLineNumber());
-          }
-        };
-
-    private final ShutdownRegistry shutdownRegistry;
-
-    @Inject
-    RegisterShutdownStackPrinter(ShutdownRegistry shutdownRegistry) {
-      this.shutdownRegistry = shutdownRegistry;
-    }
-
-    @Override
-    public void execute() {
-      shutdownRegistry.addAction(new Command() {
-        @Override public void execute() {
-          Thread thread = Thread.currentThread();
-          String message = new StringBuilder()
-              .append("Thread: ").append(thread.getName())
-              .append(" (id ").append(thread.getId()).append(")")
-              .append("\n")
-              .append(Joiner.on("\n  ").join(
-                  Iterables.transform(Arrays.asList(thread.getStackTrace()), STACK_ELEM_TOSTRING)))
-              .toString();
-
-          LOG.info("Shutdown initiated by: " + message);
-        }
-      });
-    }
-  }
-
-  @Provides
-  @Singleton
-  List<ACL> provideAcls() {
-    if (zkClientConfig.credentials == Credentials.NONE) {
-      LOG.warning("Running without ZooKeeper digest credentials. ZooKeeper ACLs are disabled.");
-      return ZooKeeperUtils.OPEN_ACL_UNSAFE;
-    } else {
-      return ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL;
-    }
-  }
-
-  @Provides
-  @Singleton
-  ServerSet provideServerSet(ZooKeeperClient client, List<ACL> zooKeeperAcls) {
-    return new ServerSetImpl(client, zooKeeperAcls, serverSetPath);
-  }
-
-  @Provides
-  @Singleton
-  DynamicHostSet<ServiceInstance> provideSchedulerHostSet(ServerSet serverSet) {
-    // Used for a type re-binding of the serverset.
-    return serverSet;
-  }
-
-  @Provides
-  @Singleton
-  SingletonService provideSingletonService(
-      ZooKeeperClient client,
-      ServerSet serverSet,
-      List<ACL> zookeeperAcls) {
-
-    return new SingletonService(
-        serverSet,
-        SingletonService.createSingletonCandidate(client, serverSetPath, zookeeperAcls));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/app/Log4jConfigurator.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/app/Log4jConfigurator.java b/src/main/java/com/twitter/aurora/scheduler/app/Log4jConfigurator.java
deleted file mode 100644
index 0ea2204..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/app/Log4jConfigurator.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.app;
-
-import com.google.common.base.Preconditions;
-
-import org.apache.log4j.BasicConfigurator;
-import org.apache.log4j.ConsoleAppender;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-
-import com.twitter.common.logging.RootLogConfig;
-import com.twitter.common.logging.RootLogConfig.Configuration;
-import com.twitter.common.logging.log4j.GlogLayout;
-
-/**
- * Configures log4j logging.
- */
-final class Log4jConfigurator {
-  private static final java.util.logging.Logger LOG =
-      java.util.logging.Logger.getLogger(Log4jConfigurator.class.getName());
-
-  /**
-   * Configures log4j to log to stderr with a glog format.
-   *
-   * @param glogConfig The glog configuration in effect.
-   */
-  static void configureConsole(Configuration glogConfig) {
-    Preconditions.checkNotNull(glogConfig);
-
-    BasicConfigurator.configure(
-        new ConsoleAppender(new GlogLayout(), ConsoleAppender.SYSTEM_ERR));
-    Logger.getRootLogger().setLevel(getLevel(glogConfig));
-  }
-
-  private static Level getLevel(RootLogConfig.Configuration logConfig) {
-    switch (logConfig.getVlog()) {
-      case FINEST: // fall through
-      case FINER: // fall through
-      case FINE: // fall through
-      case CONFIG:
-        return Level.TRACE;
-      case INFO:
-        return Level.INFO;
-      case WARNING:
-        return Level.WARN;
-      case SEVERE:
-        return Level.ERROR;
-      default:
-        LOG.warning("Mapping unexpected vlog value of " + logConfig.getVlog() + " to log4j TRACE");
-        return Level.TRACE;
-    }
-  }
-
-  private Log4jConfigurator() {
-    // Utility class.
-  }
-}