You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2013/12/31 22:20:34 UTC

[41/51] [partial] Rename twitter* and com.twitter to apache and org.apache directories to preserve all file history before the refactor.

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/storage/log/Entries.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/storage/log/Entries.java b/src/main/java/com/twitter/aurora/scheduler/storage/log/Entries.java
deleted file mode 100644
index 74e8c07..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/storage/log/Entries.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.storage.log;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Logger;
-import java.util.zip.DeflaterOutputStream;
-import java.util.zip.InflaterInputStream;
-
-import com.google.common.base.Preconditions;
-import com.google.common.io.ByteStreams;
-
-import com.twitter.aurora.codec.ThriftBinaryCodec;
-import com.twitter.aurora.codec.ThriftBinaryCodec.CodingException;
-import com.twitter.aurora.gen.storage.LogEntry;
-import com.twitter.aurora.gen.storage.LogEntry._Fields;
-import com.twitter.common.stats.Stats;
-
-/**
- * Utility class for working with log entries.
- */
-final class Entries {
-
-  private static final Logger LOG = Logger.getLogger(Entries.class.getName());
-
-  private static final AtomicLong COMPRESSION_BYTES_SAVED =
-      Stats.exportLong("log_compressed_entry_bytes_saved");
-
-  private Entries() {
-    // Utility class.
-  }
-
-  /**
-   * Deflates a log entry and wraps it in a deflated entry.
-   * <p>
-   * This will encode the entry using the thrift binary codec, and will apply deflate compression to
-   * the resulting encoded data.
-   * <p>
-   * This operation is symmetric with {@link #inflate(LogEntry)}.
-   *
-   * @param entry Entry to deflate.
-   * @return An entry with the {@code deflatedEntry} field set with the deflated serialized value
-   *         of the original entry.
-   * @throws CodingException If the value could not be encoded or deflated.
-   */
-  static LogEntry deflate(LogEntry entry) throws CodingException {
-    byte[] data = thriftBinaryEncode(entry);
-    int initialLength = data.length;
-    LOG.info("Deflating log entry of size " + initialLength);
-    ByteArrayOutputStream deflated = new ByteArrayOutputStream();
-    DeflaterOutputStream deflater = new DeflaterOutputStream(deflated);
-    try {
-      deflater.write(data);
-      deflater.flush();
-      deflater.close();
-      byte[] deflatedData = deflated.toByteArray();
-      int bytesSaved = initialLength - deflatedData.length;
-      if (bytesSaved < 0) {
-        LOG.warning("Deflated entry is larger than original by " + (bytesSaved * -1) + " bytes");
-      } else {
-        LOG.info("Deflated log entry size: " + deflatedData.length + " (saved " + bytesSaved + ")");
-      }
-
-      COMPRESSION_BYTES_SAVED.addAndGet(bytesSaved);
-      return LogEntry.deflatedEntry(ByteBuffer.wrap(deflatedData));
-    } catch (IOException e) {
-      throw new CodingException("Failed to deflate snapshot: " + e, e);
-    }
-  }
-
-  /**
-   * Inflates and deserializes a deflated log entry.
-   * <p>
-   * This requires that the {@code deflatedEntry} field is set on the provided {@code entry}.
-   * The encoded value will be inflated and deserialized as a {@link LogEntry}.
-   *
-   * @param entry Entry to inflate, which must be a deflated entry.
-   * @return The inflated entry.
-   * @throws CodingException If the value could not be inflated or decoded.
-   */
-  static LogEntry inflate(LogEntry entry) throws CodingException {
-    Preconditions.checkArgument(entry.isSet(_Fields.DEFLATED_ENTRY));
-
-    ByteArrayOutputStream inflated = new ByteArrayOutputStream();
-    ByteBuffer data = entry.bufferForDeflatedEntry();
-    LOG.info("Inflating deflated log entry of size " + data.remaining());
-    InflaterInputStream inflater = new InflaterInputStream(
-        new ByteArrayInputStream(data.array(), data.position(), data.remaining()));
-    try {
-      ByteStreams.copy(inflater, inflated);
-      byte[] inflatedData = inflated.toByteArray();
-      LOG.info("Inflated log entry size: " + inflatedData.length);
-      return thriftBinaryDecode(inflatedData);
-    } catch (IOException e) {
-      throw new CodingException("Failed to inflate compressed log entry.", e);
-    }
-  }
-
-  /**
-   * Thrift binary-encodes a log entry.
-   *
-   * @param entry The entry to encode.
-   * @return The serialized entry value.
-   * @throws CodingException If the entry could not be encoded.
-   */
-  static byte[] thriftBinaryEncode(LogEntry entry) throws CodingException {
-    return ThriftBinaryCodec.encodeNonNull(entry);
-  }
-
-  /**
-   * Decodes a byte array containing thrift binary-encoded data.
-   *
-   * @param contents The data to decode.
-   * @return The deserialized entry.
-   * @throws CodingException If the entry could not be deserialized.
-   */
-  static LogEntry thriftBinaryDecode(byte[] contents) throws CodingException {
-    return ThriftBinaryCodec.decodeNonNull(LogEntry.class, contents);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/storage/log/LogManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/storage/log/LogManager.java b/src/main/java/com/twitter/aurora/scheduler/storage/log/LogManager.java
deleted file mode 100644
index da29401..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/storage/log/LogManager.java
+++ /dev/null
@@ -1,516 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.storage.log;
-
-import java.io.IOException;
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-import java.nio.ByteBuffer;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Logger;
-
-import javax.annotation.Nullable;
-import javax.inject.Inject;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import com.google.common.primitives.Bytes;
-import com.google.inject.BindingAnnotation;
-
-import com.twitter.aurora.codec.ThriftBinaryCodec.CodingException;
-import com.twitter.aurora.gen.ScheduledTask;
-import com.twitter.aurora.gen.storage.Frame;
-import com.twitter.aurora.gen.storage.FrameChunk;
-import com.twitter.aurora.gen.storage.FrameHeader;
-import com.twitter.aurora.gen.storage.LogEntry;
-import com.twitter.aurora.gen.storage.LogEntry._Fields;
-import com.twitter.aurora.gen.storage.Op;
-import com.twitter.aurora.gen.storage.RemoveTasks;
-import com.twitter.aurora.gen.storage.SaveHostAttributes;
-import com.twitter.aurora.gen.storage.SaveTasks;
-import com.twitter.aurora.gen.storage.Snapshot;
-import com.twitter.aurora.gen.storage.Transaction;
-import com.twitter.aurora.gen.storage.storageConstants;
-import com.twitter.aurora.scheduler.log.Log;
-import com.twitter.aurora.scheduler.log.Log.Entry;
-import com.twitter.aurora.scheduler.log.Log.Position;
-import com.twitter.aurora.scheduler.log.Log.Stream;
-import com.twitter.aurora.scheduler.log.Log.Stream.InvalidPositionException;
-import com.twitter.aurora.scheduler.log.Log.Stream.StreamAccessException;
-import com.twitter.common.application.ShutdownRegistry;
-import com.twitter.common.base.Closure;
-import com.twitter.common.base.ExceptionalCommand;
-import com.twitter.common.inject.TimedInterceptor.Timed;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Data;
-import com.twitter.common.stats.Stats;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Manages opening, reading from and writing to a {@link Log}.
- */
-public final class LogManager {
-
-  /**
-   * Identifies the maximum log entry size to permit before chunking entries into frames.
-   */
-  @Retention(RetentionPolicy.RUNTIME)
-  @Target({ ElementType.PARAMETER, ElementType.METHOD })
-  @BindingAnnotation
-  public @interface MaxEntrySize { }
-
-  /**
-   * Binding annotation for settings regarding the way snapshots are written.
-   */
-  @Retention(RetentionPolicy.RUNTIME)
-  @Target({ ElementType.PARAMETER, ElementType.METHOD })
-  @BindingAnnotation
-  public @interface SnapshotSetting { }
-
-  private static final Logger LOG = Logger.getLogger(LogManager.class.getName());
-
-  private final Log log;
-  private final Amount<Integer, Data> maxEntrySize;
-  private final boolean deflateSnapshots;
-  private final ShutdownRegistry shutdownRegistry;
-
-  @Inject
-  LogManager(
-      Log log,
-      @MaxEntrySize Amount<Integer, Data> maxEntrySize,
-      @SnapshotSetting boolean deflateSnapshots,
-      ShutdownRegistry shutdownRegistry) {
-
-    this.log = checkNotNull(log);
-    this.maxEntrySize = checkNotNull(maxEntrySize);
-    this.deflateSnapshots = deflateSnapshots;
-    this.shutdownRegistry = checkNotNull(shutdownRegistry);
-  }
-
-  /**
-   * Opens the log for reading and writing.
-   *
-   * @return A stream manager that can be used to manipulate the log stream.
-   * @throws IOException If there is a problem opening the log.
-   */
-  public StreamManager open() throws IOException {
-    final Stream stream = log.open();
-    shutdownRegistry.addAction(new ExceptionalCommand<IOException>() {
-      @Override public void execute() throws IOException {
-        stream.close();
-      }
-    });
-    return new StreamManager(stream, deflateSnapshots, maxEntrySize);
-  }
-
-  /**
-   * Manages interaction with the log stream.  Log entries can be
-   * {@link #readFromBeginning(com.twitter.common.base.Closure) read from} the beginning,
-   * a {@link #startTransaction() transaction} consisting of one or more local storage
-   * operations can be committed atomically, or the log can be compacted by
-   * {@link #snapshot(com.twitter.aurora.gen.storage.Snapshot) snapshotting}.
-   */
-  public static class StreamManager {
-
-    private static MessageDigest createDigest() {
-      try {
-        return MessageDigest.getInstance("MD5");
-      } catch (NoSuchAlgorithmException e) {
-        throw new IllegalStateException("Could not find provider for standard algorithm 'MD5'", e);
-      }
-    }
-
-    private static class Vars {
-      private final AtomicInteger unSnapshottedTransactions =
-          Stats.exportInt("scheduler_log_un_snapshotted_transactions");
-      private final AtomicLong bytesWritten = Stats.exportLong("scheduler_log_bytes_written");
-      private final AtomicLong entriesWritten = Stats.exportLong("scheduler_log_entries_written");
-      private final AtomicLong badFramesRead = Stats.exportLong("scheduler_log_bad_frames_read");
-      private final AtomicLong bytesRead = Stats.exportLong("scheduler_log_bytes_read");
-      private final AtomicLong entriesRead = Stats.exportLong("scheduler_log_entries_read");
-      private final AtomicLong deflatedEntriesRead =
-          Stats.exportLong("scheduler_log_deflated_entries_read");
-      private final AtomicLong snapshots = Stats.exportLong("scheduler_log_snapshots");
-    }
-    private final Vars vars = new Vars();
-
-    private final Object writeMutex = new Object();
-    private final Stream stream;
-    private final boolean deflateSnapshots;
-    private final MessageDigest digest;
-    private final EntrySerializer entrySerializer;
-
-    StreamManager(Stream stream, boolean deflateSnapshots, Amount<Integer, Data> maxEntrySize) {
-      this.stream = checkNotNull(stream);
-      this.deflateSnapshots = deflateSnapshots;
-      digest = createDigest();
-      entrySerializer = new EntrySerializer(digest, maxEntrySize);
-    }
-
-    /**
-     * Reads all entries in the log stream after the given position.  If the position
-     * supplied is {@code null} then all log entries in the stream will be read.
-     *
-     * @param reader A reader that will be handed log entries decoded from the stream.
-     * @throws CodingException if there was a problem decoding a log entry from the stream.
-     * @throws InvalidPositionException if the given position is not found in the log.
-     * @throws StreamAccessException if there is a problem reading from the log.
-     */
-    public void readFromBeginning(Closure<LogEntry> reader)
-        throws CodingException, InvalidPositionException, StreamAccessException {
-
-      Iterator<Entry> entries = stream.readAll();
-
-      while (entries.hasNext()) {
-        LogEntry logEntry = decodeLogEntry(entries.next());
-        while (logEntry != null && isFrame(logEntry)) {
-          logEntry = tryDecodeFrame(logEntry.getFrame(), entries);
-        }
-        if (logEntry != null) {
-          if (logEntry.isSet(_Fields.DEFLATED_ENTRY)) {
-            logEntry = Entries.inflate(logEntry);
-            vars.deflatedEntriesRead.incrementAndGet();
-          }
-
-          reader.execute(logEntry);
-          vars.entriesRead.incrementAndGet();
-        }
-      }
-    }
-
-    @Nullable
-    private LogEntry tryDecodeFrame(Frame frame, Iterator<Entry> entries) throws CodingException {
-      if (!isHeader(frame)) {
-        LOG.warning("Found a frame with no preceding header, skipping.");
-        return null;
-      }
-      FrameHeader header = frame.getHeader();
-      byte[][] chunks = new byte[header.chunkCount][];
-
-      digest.reset();
-      for (int i = 0; i < header.chunkCount; i++) {
-        if (!entries.hasNext()) {
-          logBadFrame(header, i);
-          return null;
-        }
-        LogEntry logEntry = decodeLogEntry(entries.next());
-        if (!isFrame(logEntry)) {
-          logBadFrame(header, i);
-          return logEntry;
-        }
-        Frame chunkFrame = logEntry.getFrame();
-        if (!isChunk(chunkFrame)) {
-          logBadFrame(header, i);
-          return logEntry;
-        }
-        byte[] chunkData = chunkFrame.getChunk().getData();
-        digest.update(chunkData);
-        chunks[i] = chunkData;
-      }
-      if (!Arrays.equals(header.getChecksum(), digest.digest())) {
-        throw new CodingException("Read back a framed log entry that failed its checksum");
-      }
-      return Entries.thriftBinaryDecode(Bytes.concat(chunks));
-    }
-
-    private static boolean isFrame(LogEntry logEntry) {
-      return logEntry.getSetField() == LogEntry._Fields.FRAME;
-    }
-
-    private static boolean isChunk(Frame frame) {
-      return frame.getSetField() == Frame._Fields.CHUNK;
-    }
-
-    private static boolean isHeader(Frame frame) {
-      return frame.getSetField() == Frame._Fields.HEADER;
-    }
-
-    private void logBadFrame(FrameHeader header, int chunkIndex) {
-      LOG.info(String.format("Found an aborted transaction, required %d frames and found %d",
-          header.chunkCount, chunkIndex));
-      vars.badFramesRead.incrementAndGet();
-    }
-
-    private LogEntry decodeLogEntry(Entry entry) throws CodingException {
-      byte[] contents = entry.contents();
-      vars.bytesRead.addAndGet(contents.length);
-      return Entries.thriftBinaryDecode(contents);
-    }
-
-    /**
-     * Truncates all entries in the log stream occuring before the given position.  The entry at the
-     * given position becomes the first entry in the stream when this call completes.
-     *
-     * @param position The last position to keep in the stream.
-     * @throws InvalidPositionException if the specified position does not exist in this log.
-     * @throws StreamAccessException if the stream could not be truncated.
-     */
-    void truncateBefore(Position position) {
-      stream.truncateBefore(position);
-    }
-
-    /**
-     * Starts a transaction that can be used to commit a series of {@link Op}s to the log stream
-     * atomically.
-     *
-     * @return StreamTransaction A transaction manager to handle batching up commits to the
-     *    underlying stream.
-     */
-    StreamTransaction startTransaction() {
-      return new StreamTransaction();
-    }
-
-    /**
-     * Adds a snapshot to the log and if successful, truncates the log entries preceding the
-     * snapshot.
-     *
-     * @param snapshot The snapshot to add.
-     * @throws CodingException if the was a problem encoding the snapshot into a log entry.
-     * @throws InvalidPositionException if there was a problem truncating before the snapshot.
-     * @throws StreamAccessException if there was a problem appending the snapshot to the log.
-     */
-    @Timed("log_manager_snapshot")
-    void snapshot(Snapshot snapshot)
-        throws CodingException, InvalidPositionException, StreamAccessException {
-
-      LogEntry entry = LogEntry.snapshot(snapshot);
-      if (deflateSnapshots) {
-        entry = Entries.deflate(entry);
-      }
-
-      Position position = appendAndGetPosition(entry);
-      vars.snapshots.incrementAndGet();
-      vars.unSnapshottedTransactions.set(0);
-      stream.truncateBefore(position);
-    }
-
-    @Timed("log_manager_append")
-    private Position appendAndGetPosition(LogEntry logEntry) throws CodingException {
-      Position firstPosition = null;
-      byte[][] entries = entrySerializer.serialize(logEntry);
-      synchronized (writeMutex) { // ensure all sub-entries are written as a unit
-        for (byte[] entry : entries) {
-          Position position = stream.append(entry);
-          if (firstPosition == null) {
-            firstPosition = position;
-          }
-          vars.bytesWritten.addAndGet(entry.length);
-        }
-      }
-      vars.entriesWritten.incrementAndGet();
-      return firstPosition;
-    }
-
-    @VisibleForTesting
-    public static class EntrySerializer {
-      private final MessageDigest digest;
-      private final int maxEntrySizeBytes;
-
-      private EntrySerializer(MessageDigest digest, Amount<Integer, Data> maxEntrySize) {
-        this.digest = checkNotNull(digest);
-        maxEntrySizeBytes = maxEntrySize.as(Data.BYTES);
-      }
-
-      public EntrySerializer(Amount<Integer, Data> maxEntrySize) {
-        this(createDigest(), maxEntrySize);
-      }
-
-      /**
-       * Serializes a log entry and splits it into chunks no larger than {@code maxEntrySizeBytes}.
-       *
-       * @param logEntry The log entry to serialize.
-       * @return Serialized and chunked log entry.
-       * @throws CodingException If the entry could not be serialized.
-       */
-      @VisibleForTesting
-      public byte[][] serialize(LogEntry logEntry) throws CodingException {
-        byte[] entry = Entries.thriftBinaryEncode(logEntry);
-        if (entry.length <= maxEntrySizeBytes) {
-          return new byte[][] {entry};
-        }
-
-        int chunks = (int) Math.ceil(entry.length / (double) maxEntrySizeBytes);
-        byte[][] frames = new byte[chunks + 1][];
-
-        frames[0] = encode(Frame.header(new FrameHeader(chunks, ByteBuffer.wrap(checksum(entry)))));
-        for (int i = 0; i < chunks; i++) {
-          int offset = i * maxEntrySizeBytes;
-          ByteBuffer chunk =
-              ByteBuffer.wrap(entry, offset, Math.min(maxEntrySizeBytes, entry.length - offset));
-          frames[i + 1] = encode(Frame.chunk(new FrameChunk(chunk)));
-        }
-        return frames;
-      }
-
-      private byte[] checksum(byte[] data) {
-        digest.reset();
-        return digest.digest(data);
-      }
-
-      private static byte[] encode(Frame frame) throws CodingException {
-        return Entries.thriftBinaryEncode(LogEntry.frame(frame));
-      }
-    }
-
-    /**
-     * Manages a single log stream append transaction.  Local storage ops can be added to the
-     * transaction and then later committed as an atomic unit.
-     */
-    final class StreamTransaction {
-      private final Transaction transaction =
-          new Transaction().setSchemaVersion(storageConstants.CURRENT_SCHEMA_VERSION);
-      private final AtomicBoolean committed = new AtomicBoolean(false);
-
-      private StreamTransaction() {
-        // supplied by factory method
-      }
-
-      /**
-       * Appends any ops that have been added to this transaction to the log stream in a single
-       * atomic record.
-       *
-       * @return The position of the log entry committed in this transaction, if any.
-       * @throws CodingException If there was a problem encoding a log entry for commit.
-       */
-      Position commit() throws CodingException {
-        Preconditions.checkState(!committed.getAndSet(true),
-            "Can only call commit once per transaction.");
-
-        if (!transaction.isSetOps()) {
-          return null;
-        }
-
-        Position position = appendAndGetPosition(LogEntry.transaction(transaction));
-        vars.unSnapshottedTransactions.incrementAndGet();
-        return position;
-      }
-
-      /**
-       * Adds a local storage operation to this transaction.
-       *
-       * @param op The local storage op to add.
-       */
-      void add(Op op) {
-        Preconditions.checkState(!committed.get());
-
-        Op prior = transaction.isSetOps() ? Iterables.getLast(transaction.getOps(), null) : null;
-        if (prior == null || !coalesce(prior, op)) {
-          transaction.addToOps(op);
-        }
-      }
-
-      /**
-       * Tries to coalesce a new op into the prior to compact the binary representation and increase
-       * batching.
-       *
-       * <p>Its recommended that as new {@code Op}s are added, they be treated here although they
-       * need not be</p>
-       *
-       * @param prior The previous op.
-       * @param next The next op to be added.
-       * @return {@code true} if the next op was coalesced into the prior, {@code false} otherwise.
-       */
-      private boolean coalesce(Op prior, Op next) {
-        if (!prior.isSet() && !next.isSet()) {
-          return false;
-        }
-
-        Op._Fields priorType = prior.getSetField();
-        if (!priorType.equals(next.getSetField())) {
-          return false;
-        }
-
-        switch (priorType) {
-          case SAVE_FRAMEWORK_ID:
-            prior.setSaveFrameworkId(next.getSaveFrameworkId());
-            return true;
-
-          case SAVE_ACCEPTED_JOB:
-          case REMOVE_JOB:
-          case SAVE_QUOTA:
-          case REMOVE_QUOTA:
-            return false;
-
-          case SAVE_TASKS:
-            coalesce(prior.getSaveTasks(), next.getSaveTasks());
-            return true;
-          case REMOVE_TASKS:
-            coalesce(prior.getRemoveTasks(), next.getRemoveTasks());
-            return true;
-          case SAVE_HOST_ATTRIBUTES:
-            return coalesce(prior.getSaveHostAttributes(), next.getSaveHostAttributes());
-          default:
-            LOG.warning("Unoptimized op: " + priorType);
-            return false;
-        }
-      }
-
-      private void coalesce(SaveTasks prior, SaveTasks next) {
-        if (next.isSetTasks()) {
-          if (prior.isSetTasks()) {
-            // It is an expected invariant that an operation may reference a task (identified by
-            // task ID) no more than one time.  Therefore, to coalesce two SaveTasks operations,
-            // the most recent task definition overrides the prior operation.
-            Map<String, ScheduledTask> coalesced = Maps.newHashMap();
-            for (ScheduledTask task : prior.getTasks()) {
-              coalesced.put(task.getAssignedTask().getTaskId(), task);
-            }
-            for (ScheduledTask task : next.getTasks()) {
-              coalesced.put(task.getAssignedTask().getTaskId(), task);
-            }
-            prior.setTasks(ImmutableSet.copyOf(coalesced.values()));
-          } else {
-            prior.setTasks(next.getTasks());
-          }
-        }
-      }
-
-      private void coalesce(RemoveTasks prior, RemoveTasks next) {
-        if (next.isSetTaskIds()) {
-          if (prior.isSetTaskIds()) {
-            prior.setTaskIds(ImmutableSet.<String>builder()
-                .addAll(prior.getTaskIds())
-                .addAll(next.getTaskIds())
-                .build());
-          } else {
-            prior.setTaskIds(next.getTaskIds());
-          }
-        }
-      }
-
-      private boolean coalesce(SaveHostAttributes prior, SaveHostAttributes next) {
-        if (prior.getHostAttributes().getHost().equals(next.getHostAttributes().getHost())) {
-          prior.getHostAttributes().setAttributes(next.getHostAttributes().getAttributes());
-          return true;
-        }
-        return false;
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/storage/log/LogStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/storage/log/LogStorage.java b/src/main/java/com/twitter/aurora/scheduler/storage/log/LogStorage.java
deleted file mode 100644
index 74f06aa..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/storage/log/LogStorage.java
+++ /dev/null
@@ -1,715 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.storage.log;
-
-import java.io.IOException;
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-import java.util.Date;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
-import com.google.inject.BindingAnnotation;
-
-import com.twitter.aurora.codec.ThriftBinaryCodec.CodingException;
-import com.twitter.aurora.gen.HostAttributes;
-import com.twitter.aurora.gen.MaintenanceMode;
-import com.twitter.aurora.gen.storage.LogEntry;
-import com.twitter.aurora.gen.storage.Op;
-import com.twitter.aurora.gen.storage.RemoveJob;
-import com.twitter.aurora.gen.storage.RemoveLock;
-import com.twitter.aurora.gen.storage.RemoveQuota;
-import com.twitter.aurora.gen.storage.RemoveTasks;
-import com.twitter.aurora.gen.storage.RewriteTask;
-import com.twitter.aurora.gen.storage.SaveAcceptedJob;
-import com.twitter.aurora.gen.storage.SaveFrameworkId;
-import com.twitter.aurora.gen.storage.SaveHostAttributes;
-import com.twitter.aurora.gen.storage.SaveLock;
-import com.twitter.aurora.gen.storage.SaveQuota;
-import com.twitter.aurora.gen.storage.SaveTasks;
-import com.twitter.aurora.gen.storage.Snapshot;
-import com.twitter.aurora.scheduler.base.Query;
-import com.twitter.aurora.scheduler.base.SchedulerException;
-import com.twitter.aurora.scheduler.base.Tasks;
-import com.twitter.aurora.scheduler.log.Log.Stream.InvalidPositionException;
-import com.twitter.aurora.scheduler.log.Log.Stream.StreamAccessException;
-import com.twitter.aurora.scheduler.storage.AttributeStore;
-import com.twitter.aurora.scheduler.storage.DistributedSnapshotStore;
-import com.twitter.aurora.scheduler.storage.ForwardingStore;
-import com.twitter.aurora.scheduler.storage.JobStore;
-import com.twitter.aurora.scheduler.storage.LockStore;
-import com.twitter.aurora.scheduler.storage.QuotaStore;
-import com.twitter.aurora.scheduler.storage.SchedulerStore;
-import com.twitter.aurora.scheduler.storage.SnapshotStore;
-import com.twitter.aurora.scheduler.storage.Storage;
-import com.twitter.aurora.scheduler.storage.Storage.NonVolatileStorage;
-import com.twitter.aurora.scheduler.storage.TaskStore;
-import com.twitter.aurora.scheduler.storage.entities.IJobConfiguration;
-import com.twitter.aurora.scheduler.storage.entities.IJobKey;
-import com.twitter.aurora.scheduler.storage.entities.ILock;
-import com.twitter.aurora.scheduler.storage.entities.ILockKey;
-import com.twitter.aurora.scheduler.storage.entities.IQuota;
-import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
-import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
-import com.twitter.aurora.scheduler.storage.log.LogManager.StreamManager;
-import com.twitter.aurora.scheduler.storage.log.LogManager.StreamManager.StreamTransaction;
-import com.twitter.common.application.ShutdownRegistry;
-import com.twitter.common.base.Closure;
-import com.twitter.common.inject.TimedInterceptor.Timed;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.util.concurrent.ExecutorServiceShutdown;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * A storage implementation that ensures committed transactions are written to a log.
- *
- * <p>In the classic write-ahead log usage we'd perform mutations as follows:
- * <ol>
- *   <li>write op to log</li>
- *   <li>perform op locally</li>
- *   <li>*checkpoint</li>
- * </ol>
- *
- * <p>Writing the operation to the log provides us with a fast persistence mechanism to ensure we
- * have a record of our mutation in case we should need to recover state later after a crash or on
- * a new host (assuming the log is distributed).  We then apply the mutation to a local (in-memory)
- * data structure for serving fast read requests and then optionally write down the position of the
- * log entry we wrote in the first step to stable storage to allow for quicker recovery after a
- * crash. Instead of reading the whole log, we can read all entries past the checkpoint.  This
- * design implies that all mutations must be idempotent and free from constraint and thus
- * replayable over newer operations when recovering from an old checkpoint.
- *
- * <p>The important detail in our case is the possibility of writing an op to the log, and then
- * failing to commit locally since we use a local database instead of an in-memory data structure.
- * If we die after such a failure, then another instance can read and apply the logged op
- * erroneously.
- *
- * <p>This implementation leverages a local transaction to handle this:
- * <ol>
- *   <li>start local transaction</li>
- *   <li>perform op locally (uncommitted!)</li>
- *   <li>write op to log</li>
- *   <li>commit local transaction</li>
- *   <li>*checkpoint</li>
- * </ol>
- *
- * <p>If the op fails to apply to local storage we will never write the op to the log and if the op
- * fails to apply to the log, it'll throw and abort the local storage transaction as well.
- */
-public class LogStorage extends ForwardingStore
-    implements NonVolatileStorage, DistributedSnapshotStore {
-
-  /**
-   * A service that can schedule an action to be executed periodically.
-   */
-  @VisibleForTesting
-  interface SchedulingService {
-
-    /**
-     * Schedules an action to execute periodically.
-     *
-     * @param interval The time period to wait until running the {@code action} again.
-     * @param action The action to execute periodically.
-     */
-    void doEvery(Amount<Long, Time> interval, Runnable action);
-  }
-
-  private static class ScheduledExecutorSchedulingService implements SchedulingService {
-    private final ScheduledExecutorService scheduledExecutor;
-
-    ScheduledExecutorSchedulingService(ShutdownRegistry shutdownRegistry,
-        Amount<Long, Time> shutdownGracePeriod) {
-      scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
-      shutdownRegistry.addAction(
-          new ExecutorServiceShutdown(scheduledExecutor, shutdownGracePeriod));
-    }
-
-    @Override
-    public void doEvery(Amount<Long, Time> interval, Runnable action) {
-      checkNotNull(interval);
-      checkNotNull(action);
-
-      long delay = interval.getValue();
-      TimeUnit timeUnit = interval.getUnit().getTimeUnit();
-      scheduledExecutor.scheduleWithFixedDelay(action, delay, delay, timeUnit);
-    }
-  }
-
-  private static final Logger LOG = Logger.getLogger(LogStorage.class.getName());
-
-  private final LogManager logManager;
-  private final SchedulingService schedulingService;
-  private final SnapshotStore<Snapshot> snapshotStore;
-  private final Amount<Long, Time> snapshotInterval;
-
-  private StreamManager streamManager;
-
-  private boolean recovered = false;
-  private StreamTransaction transaction = null;
-
-  private final MutableStoreProvider logStoreProvider = new MutableStoreProvider() {
-    @Override public SchedulerStore.Mutable getSchedulerStore() {
-      return LogStorage.this;
-    }
-
-    @Override public JobStore.Mutable getJobStore() {
-      return LogStorage.this;
-    }
-
-    @Override public TaskStore getTaskStore() {
-      return LogStorage.this;
-    }
-
-    @Override public TaskStore.Mutable getUnsafeTaskStore() {
-      return LogStorage.this;
-    }
-
-    @Override public LockStore.Mutable getLockStore() {
-      return LogStorage.this;
-    }
-
-    @Override public QuotaStore.Mutable getQuotaStore() {
-      return LogStorage.this;
-    }
-
-    @Override public AttributeStore.Mutable getAttributeStore() {
-      return LogStorage.this;
-    }
-  };
-
-  /**
-   * Identifies the grace period to give in-process snapshots and checkpoints to complete during
-   * shutdown.
-   */
-  @Retention(RetentionPolicy.RUNTIME)
-  @Target({ ElementType.PARAMETER, ElementType.METHOD })
-  @BindingAnnotation
-  public @interface ShutdownGracePeriod { }
-
-  /**
-   * Identifies the interval between snapshots of local storage truncating the log.
-   */
-  @Retention(RetentionPolicy.RUNTIME)
-  @Target({ ElementType.PARAMETER, ElementType.METHOD })
-  @BindingAnnotation
-  public @interface SnapshotInterval { }
-
-  /**
-   * Identifies a local storage layer that is written to only after first ensuring the write
-   * operation is persisted in the log.
-   */
-  @Retention(RetentionPolicy.RUNTIME)
-  @Target({ ElementType.PARAMETER, ElementType.METHOD })
-  @BindingAnnotation
-  public @interface WriteBehind { }
-
-  @Inject
-  LogStorage(LogManager logManager,
-             ShutdownRegistry shutdownRegistry,
-             @ShutdownGracePeriod Amount<Long, Time> shutdownGracePeriod,
-             SnapshotStore<Snapshot> snapshotStore,
-             @SnapshotInterval Amount<Long, Time> snapshotInterval,
-             @WriteBehind Storage storage,
-             @WriteBehind SchedulerStore.Mutable schedulerStore,
-             @WriteBehind JobStore.Mutable jobStore,
-             @WriteBehind TaskStore.Mutable taskStore,
-             @WriteBehind LockStore.Mutable lockStore,
-             @WriteBehind QuotaStore.Mutable quotaStore,
-             @WriteBehind AttributeStore.Mutable attributeStore) {
-
-    this(logManager,
-        new ScheduledExecutorSchedulingService(shutdownRegistry, shutdownGracePeriod),
-        snapshotStore,
-        snapshotInterval,
-        storage,
-        schedulerStore,
-        jobStore,
-        taskStore,
-        lockStore,
-        quotaStore,
-        attributeStore);
-  }
-
-  @VisibleForTesting
-  LogStorage(LogManager logManager,
-             SchedulingService schedulingService,
-             SnapshotStore<Snapshot> snapshotStore,
-             Amount<Long, Time> snapshotInterval,
-             Storage storage,
-             SchedulerStore.Mutable schedulerStore,
-             JobStore.Mutable jobStore,
-             TaskStore.Mutable taskStore,
-             LockStore.Mutable lockStore,
-             QuotaStore.Mutable quotaStore,
-             AttributeStore.Mutable attributeStore) {
-
-    super(storage, schedulerStore, jobStore, taskStore, lockStore, quotaStore, attributeStore);
-    this.logManager = checkNotNull(logManager);
-    this.schedulingService = checkNotNull(schedulingService);
-    this.snapshotStore = checkNotNull(snapshotStore);
-    this.snapshotInterval = checkNotNull(snapshotInterval);
-  }
-
-  @Override
-  public synchronized void prepare() {
-    // Open the log to make a log replica available to the scheduler group.
-    try {
-      streamManager = logManager.open();
-    } catch (IOException e) {
-      throw new IllegalStateException("Failed to open the log, cannot continue", e);
-    }
-
-    // TODO(John Sirois): start incremental recovery here from the log and do a final recovery
-    // catchup in start after shutting down the incremental syncer.
-  }
-
-  @Override
-  public synchronized void start(final MutateWork.NoResult.Quiet initializationLogic) {
-    write(new MutateWork.NoResult.Quiet() {
-      @Override protected void execute(MutableStoreProvider unused) {
-        // Must have the underlying storage started so we can query it for the last checkpoint.
-        // We replay these entries in the forwarded storage system's transactions but not ours - we
-        // do not want to re-record these ops to the log.
-        recover();
-        recovered = true;
-
-        // Now that we're recovered we should let any mutations done in initializationLogic append
-        // to the log, so run it in one of our transactions.
-        write(initializationLogic);
-      }
-    });
-
-    scheduleSnapshots();
-  }
-
-  @Override
-  public void stop() {
-    // No-op.
-  }
-
-  @Timed("scheduler_log_recover")
-  void recover() throws RecoveryFailedException {
-    try {
-      streamManager.readFromBeginning(new Closure<LogEntry>() {
-        @Override public void execute(LogEntry logEntry) {
-          replay(logEntry);
-        }
-      });
-    } catch (CodingException | InvalidPositionException | StreamAccessException e) {
-      throw new RecoveryFailedException(e);
-    }
-  }
-
-  private static final class RecoveryFailedException extends SchedulerException {
-    private RecoveryFailedException(Throwable cause) {
-      super(cause);
-    }
-  }
-
-  void replay(final LogEntry logEntry) {
-    switch (logEntry.getSetField()) {
-      case SNAPSHOT:
-        Snapshot snapshot = logEntry.getSnapshot();
-        LOG.info("Applying snapshot taken on " + new Date(snapshot.getTimestamp()));
-        snapshotStore.applySnapshot(snapshot);
-        break;
-
-      case TRANSACTION:
-        for (Op op : logEntry.getTransaction().getOps()) {
-          replayOp(op);
-        }
-        break;
-
-      case NOOP:
-        // Nothing to do here
-        break;
-
-      case DEFLATED_ENTRY:
-        throw new IllegalArgumentException("Deflated entries are not handled at this layer.");
-
-      case FRAME:
-        throw new IllegalArgumentException("Framed entries are not handled at this layer.");
-
-      default:
-        throw new IllegalStateException("Unknown log entry type: " + logEntry);
-    }
-  }
-
-  private void replayOp(Op op) {
-    switch (op.getSetField()) {
-      case SAVE_FRAMEWORK_ID:
-        saveFrameworkId(op.getSaveFrameworkId().getId());
-        break;
-
-      case SAVE_ACCEPTED_JOB:
-        SaveAcceptedJob acceptedJob = op.getSaveAcceptedJob();
-        saveAcceptedJob(
-            acceptedJob.getManagerId(),
-            IJobConfiguration.build(acceptedJob.getJobConfig()));
-        break;
-
-      case REMOVE_JOB:
-        removeJob(IJobKey.build(op.getRemoveJob().getJobKey()));
-        break;
-
-      case SAVE_TASKS:
-        saveTasks(IScheduledTask.setFromBuilders(op.getSaveTasks().getTasks()));
-        break;
-
-      case REWRITE_TASK:
-        RewriteTask rewriteTask = op.getRewriteTask();
-        unsafeModifyInPlace(rewriteTask.getTaskId(), ITaskConfig.build(rewriteTask.getTask()));
-        break;
-
-      case REMOVE_TASKS:
-        deleteTasks(op.getRemoveTasks().getTaskIds());
-        break;
-
-      case SAVE_QUOTA:
-        SaveQuota saveQuota = op.getSaveQuota();
-        saveQuota(saveQuota.getRole(), IQuota.build(saveQuota.getQuota()));
-        break;
-
-      case REMOVE_QUOTA:
-        removeQuota(op.getRemoveQuota().getRole());
-        break;
-
-      case SAVE_HOST_ATTRIBUTES:
-        saveHostAttributes(op.getSaveHostAttributes().hostAttributes);
-        break;
-
-      case SAVE_LOCK:
-        saveLock(ILock.build(op.getSaveLock().getLock()));
-        break;
-
-      case REMOVE_LOCK:
-        removeLock(ILockKey.build(op.getRemoveLock().getLockKey()));
-        break;
-
-      default:
-        throw new IllegalStateException("Unknown transaction op: " + op);
-    }
-  }
-
-  private void scheduleSnapshots() {
-    if (snapshotInterval.getValue() > 0) {
-      schedulingService.doEvery(snapshotInterval, new Runnable() {
-        @Override public void run() {
-          try {
-            snapshot();
-          } catch (StorageException e) {
-            if (e.getCause() != null) {
-              LOG.log(Level.WARNING, e.getMessage(), e.getCause());
-            } else {
-              LOG.log(Level.WARNING, "StorageException when attempting to snapshot.", e);
-            }
-          }
-        }
-      });
-    }
-  }
-
-  /**
-   * Forces a snapshot of the storage state.
-   *
-   * @throws CodingException If there is a problem encoding the snapshot.
-   * @throws InvalidPositionException If the log stream cursor is invalid.
-   * @throws StreamAccessException If there is a problem writing the snapshot to the log stream.
-   */
-  @Timed("scheduler_log_snapshot")
-  void doSnapshot() throws CodingException, InvalidPositionException, StreamAccessException {
-    super.write(new MutateWork.NoResult<CodingException>() {
-      @Override protected void execute(MutableStoreProvider unused)
-          throws CodingException, InvalidPositionException, StreamAccessException {
-
-        persist(snapshotStore.createSnapshot());
-      }
-    });
-  }
-
-  @Timed("scheduler_log_snapshot_persist")
-  @Override
-  public void persist(Snapshot snapshot)
-      throws CodingException, InvalidPositionException, StreamAccessException {
-
-    streamManager.snapshot(snapshot);
-  }
-
-  @Override
-  public synchronized <T, E extends Exception> T write(final MutateWork<T, E> work)
-      throws StorageException, E {
-
-    // We don't want to use the log when recovering from it, we just want to update the underlying
-    // store - so pass mutations straight through to the underlying storage.
-    if (!recovered) {
-      return super.write(work);
-    }
-
-    // The log stream transaction has already been set up so we just need to delegate with our
-    // store provider so any mutations performed by work get logged.
-    if (transaction != null) {
-      return super.write(new MutateWork<T, E>() {
-        @Override public T apply(MutableStoreProvider unused) throws E {
-          return work.apply(logStoreProvider);
-        }
-      });
-    }
-
-    transaction = streamManager.startTransaction();
-    try {
-      return super.write(new MutateWork<T, E>() {
-        @Override public T apply(MutableStoreProvider unused) throws E {
-          T result = work.apply(logStoreProvider);
-          try {
-            transaction.commit();
-          } catch (CodingException e) {
-            throw new IllegalStateException(
-                "Problem encoding transaction operations to the log stream", e);
-          } catch (StreamAccessException e) {
-            throw new StorageException(
-                "There was a problem committing the transaction to the log.", e);
-          }
-          return result;
-        }
-      });
-    } finally {
-      transaction = null;
-    }
-  }
-
-  @Timed("scheduler_log_save_framework_id")
-  @Override
-  public void saveFrameworkId(final String frameworkId) {
-    write(new MutateWork.NoResult.Quiet() {
-      @Override protected void execute(MutableStoreProvider unused) {
-        log(Op.saveFrameworkId(new SaveFrameworkId(frameworkId)));
-        LogStorage.super.saveFrameworkId(frameworkId);
-      }
-    });
-  }
-
-  @Timed("scheduler_log_job_save")
-  @Override
-  public void saveAcceptedJob(final String managerId, final IJobConfiguration jobConfig) {
-    write(new MutateWork.NoResult.Quiet() {
-      @Override protected void execute(MutableStoreProvider unused) {
-        log(Op.saveAcceptedJob(new SaveAcceptedJob(managerId, jobConfig.newBuilder())));
-        LogStorage.super.saveAcceptedJob(managerId, jobConfig);
-      }
-    });
-  }
-
-  @Timed("scheduler_log_job_remove")
-  @Override
-  public void removeJob(final IJobKey jobKey) {
-    checkNotNull(jobKey);
-
-    write(new MutateWork.NoResult.Quiet() {
-      @Override protected void execute(MutableStoreProvider unused) {
-        log(Op.removeJob(new RemoveJob().setJobKey(jobKey.newBuilder())));
-        LogStorage.super.removeJob(jobKey);
-      }
-    });
-  }
-
-  @Timed("scheduler_log_tasks_save")
-  @Override
-  public void saveTasks(final Set<IScheduledTask> newTasks) throws IllegalStateException {
-    write(new MutateWork.NoResult.Quiet() {
-      @Override protected void execute(MutableStoreProvider unused) {
-        log(Op.saveTasks(new SaveTasks(IScheduledTask.toBuildersSet(newTasks))));
-        LogStorage.super.saveTasks(newTasks);
-      }
-    });
-  }
-
-  @Override
-  public void deleteAllTasks() {
-    write(new MutateWork.NoResult.Quiet() {
-      @Override protected void execute(MutableStoreProvider storeProvider) {
-        Query.Builder query = Query.unscoped();
-        Set<String> ids = FluentIterable.from(storeProvider.getTaskStore().fetchTasks(query))
-            .transform(Tasks.SCHEDULED_TO_ID)
-            .toSet();
-        deleteTasks(ids);
-      }
-    });
-  }
-
-  @Timed("scheduler_log_tasks_remove")
-  @Override
-  public void deleteTasks(final Set<String> taskIds) {
-    write(new MutateWork.NoResult.Quiet() {
-      @Override protected void execute(MutableStoreProvider unused) {
-        log(Op.removeTasks(new RemoveTasks(taskIds)));
-        LogStorage.super.deleteTasks(taskIds);
-      }
-    });
-  }
-
-  @Timed("scheduler_log_tasks_mutate")
-  @Override
-  public ImmutableSet<IScheduledTask> mutateTasks(
-      final Query.Builder query,
-      final Function<IScheduledTask, IScheduledTask> mutator) {
-
-    return write(new MutateWork.Quiet<ImmutableSet<IScheduledTask>>() {
-      @Override public ImmutableSet<IScheduledTask> apply(MutableStoreProvider unused) {
-        ImmutableSet<IScheduledTask> mutated = LogStorage.super.mutateTasks(query, mutator);
-
-        Map<String, IScheduledTask> tasksById = Tasks.mapById(mutated);
-        if (LOG.isLoggable(Level.FINE)) {
-          LOG.fine("Storing updated tasks to log: "
-              + Maps.transformValues(tasksById, Tasks.GET_STATUS));
-        }
-
-        // TODO(William Farner): Avoid writing an op when mutated is empty.
-        log(Op.saveTasks(new SaveTasks(IScheduledTask.toBuildersSet(mutated))));
-        return mutated;
-      }
-    });
-  }
-
-  @Timed("scheduler_log_unsafe_modify_in_place")
-  @Override
-  public boolean unsafeModifyInPlace(final String taskId, final ITaskConfig taskConfiguration) {
-    return write(new MutateWork.Quiet<Boolean>() {
-      @Override public Boolean apply(MutableStoreProvider storeProvider) {
-        boolean mutated = LogStorage.super.unsafeModifyInPlace(taskId, taskConfiguration);
-        if (mutated) {
-          log(Op.rewriteTask(new RewriteTask(taskId, taskConfiguration.newBuilder())));
-        }
-        return mutated;
-      }
-    });
-  }
-
-  @Timed("scheduler_log_quota_remove")
-  @Override
-  public void removeQuota(final String role) {
-    write(new MutateWork.NoResult.Quiet() {
-      @Override protected void execute(MutableStoreProvider unused) {
-        log(Op.removeQuota(new RemoveQuota(role)));
-        LogStorage.super.removeQuota(role);
-      }
-    });
-  }
-
-  @Timed("scheduler_log_quota_save")
-  @Override
-  public void saveQuota(final String role, final IQuota quota) {
-    write(new MutateWork.NoResult.Quiet() {
-      @Override protected void execute(MutableStoreProvider unused) {
-        log(Op.saveQuota(new SaveQuota(role, quota.newBuilder())));
-        LogStorage.super.saveQuota(role, quota);
-      }
-    });
-  }
-
-  @Timed("scheduler_save_host_attribute")
-  @Override
-  public void saveHostAttributes(final HostAttributes attrs) {
-    write(new MutateWork.NoResult.Quiet() {
-      @Override protected void execute(MutableStoreProvider unused) {
-        // Pass the updated attributes upstream, and then check if the stored value changes.
-        // We do this since different parts of the system write partial HostAttributes objects
-        // and they are merged together internally.
-        // TODO(William Farner): Split out a separate method
-        //                       saveAttributes(String host, Iterable<Attributes>) to simplify this.
-        Optional<HostAttributes> saved = LogStorage.super.getHostAttributes(attrs.getHost());
-        LogStorage.super.saveHostAttributes(attrs);
-        Optional<HostAttributes> updated = LogStorage.super.getHostAttributes(attrs.getHost());
-        if (!saved.equals(updated)) {
-          log(Op.saveHostAttributes(new SaveHostAttributes(updated.get())));
-        }
-      }
-    });
-  }
-
-  @Timed("scheduler_lock_save")
-  @Override
-  public void saveLock(final ILock lock) {
-    write(new MutateWork.NoResult.Quiet() {
-      @Override protected void execute(MutableStoreProvider unused) {
-        log(Op.saveLock(new SaveLock(lock.newBuilder())));
-        LogStorage.super.saveLock(lock);
-      }
-    });
-  }
-
-  @Timed("scheduler_lock_remove")
-  @Override
-  public void removeLock(final ILockKey lockKey) {
-    write(new MutateWork.NoResult.Quiet() {
-      @Override protected void execute(MutableStoreProvider unused) {
-        log(Op.removeLock(new RemoveLock(lockKey.newBuilder())));
-        LogStorage.super.removeLock(lockKey);
-      }
-    });
-  }
-
-  @Override
-  public boolean setMaintenanceMode(final String host, final MaintenanceMode mode) {
-    write(new MutateWork.NoResult.Quiet() {
-      @Override protected void execute(MutableStoreProvider unused) {
-        Optional<HostAttributes> saved = LogStorage.super.getHostAttributes(host);
-        if (saved.isPresent()) {
-          HostAttributes attributes = saved.get().setMode(mode);
-          log(Op.saveHostAttributes(new SaveHostAttributes(attributes)));
-          LogStorage.super.saveHostAttributes(attributes);
-        }
-      }
-    });
-    return false;
-  }
-
-  @Override
-  public void snapshot() throws StorageException {
-    try {
-      doSnapshot();
-    } catch (CodingException e) {
-      throw new StorageException("Failed to encode a snapshot", e);
-    } catch (InvalidPositionException e) {
-      throw new StorageException("Saved snapshot but failed to truncate entries preceding it", e);
-    } catch (StreamAccessException e) {
-      throw new StorageException("Failed to create a snapshot", e);
-    }
-    super.snapshot();
-  }
-
-  private void log(Op op) {
-    if (recovered) {
-      transaction.add(op);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/storage/log/LogStorageModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/storage/log/LogStorageModule.java b/src/main/java/com/twitter/aurora/scheduler/storage/log/LogStorageModule.java
deleted file mode 100644
index 92568c8..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/storage/log/LogStorageModule.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.storage.log;
-
-import java.lang.annotation.Annotation;
-
-import javax.inject.Singleton;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.inject.AbstractModule;
-import com.google.inject.Key;
-import com.google.inject.TypeLiteral;
-
-import com.twitter.aurora.scheduler.log.Log;
-import com.twitter.aurora.scheduler.storage.CallOrderEnforcingStorage;
-import com.twitter.aurora.scheduler.storage.DistributedSnapshotStore;
-import com.twitter.aurora.scheduler.storage.log.LogManager.MaxEntrySize;
-import com.twitter.aurora.scheduler.storage.log.LogManager.SnapshotSetting;
-import com.twitter.aurora.scheduler.storage.log.LogStorage.ShutdownGracePeriod;
-import com.twitter.aurora.scheduler.storage.log.LogStorage.SnapshotInterval;
-import com.twitter.common.application.ShutdownRegistry;
-import com.twitter.common.args.Arg;
-import com.twitter.common.args.CmdLine;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Data;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.util.Clock;
-
-/**
- * Bindings for scheduler distributed log based storage.
- * <p/>
- * Requires bindings for:
- * <ul>
- *   <li>{@link Clock}</li>
- *   <li>{@link ShutdownRegistry}</li>
- *   <li>The concrete {@link Log} implementation.</li>
- * </ul>
- * <p/>
- */
-public class LogStorageModule extends AbstractModule {
-
-  @CmdLine(name = "dlog_shutdown_grace_period",
-           help = "Specifies the maximum time to wait for scheduled checkpoint and snapshot "
-                  + "actions to complete before forcibly shutting down.")
-  private static final Arg<Amount<Long, Time>> SHUTDOWN_GRACE_PERIOD =
-      Arg.create(Amount.of(2L, Time.SECONDS));
-
-  @CmdLine(name = "dlog_snapshot_interval",
-           help = "Specifies the frequency at which snapshots of local storage are taken and "
-                  + "written to the log.")
-  private static final Arg<Amount<Long, Time>> SNAPSHOT_INTERVAL =
-      Arg.create(Amount.of(1L, Time.HOURS));
-
-  @CmdLine(name = "dlog_max_entry_size",
-           help = "Specifies the maximum entry size to append to the log. Larger entries will be "
-                  + "split across entry Frames.")
-  @VisibleForTesting
-  public static final Arg<Amount<Integer, Data>> MAX_LOG_ENTRY_SIZE =
-      Arg.create(Amount.of(512, Data.KB));
-
-  @CmdLine(name = "deflate_snapshots", help = "Whether snapshots should be deflate-compressed.")
-  private static final Arg<Boolean> DEFLATE_SNAPSHOTS = Arg.create(true);
-
-  @Override
-  protected void configure() {
-    requireBinding(Log.class);
-    requireBinding(Clock.class);
-    requireBinding(ShutdownRegistry.class);
-
-    bindInterval(ShutdownGracePeriod.class, SHUTDOWN_GRACE_PERIOD);
-    bindInterval(SnapshotInterval.class, SNAPSHOT_INTERVAL);
-
-    bind(new TypeLiteral<Amount<Integer, Data>>() { }).annotatedWith(MaxEntrySize.class)
-        .toInstance(MAX_LOG_ENTRY_SIZE.get());
-    bind(LogManager.class).in(Singleton.class);
-    bind(Boolean.class).annotatedWith(SnapshotSetting.class).toInstance(DEFLATE_SNAPSHOTS.get());
-
-    bind(LogStorage.class).in(Singleton.class);
-    install(CallOrderEnforcingStorage.wrappingModule(LogStorage.class));
-    bind(DistributedSnapshotStore.class).to(LogStorage.class);
-  }
-
-  private void bindInterval(Class<? extends Annotation> key, Arg<Amount<Long, Time>> value) {
-    bind(Key.get(new TypeLiteral<Amount<Long, Time>>() { }, key)).toInstance(value.get());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/storage/log/SnapshotStoreImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/storage/log/SnapshotStoreImpl.java b/src/main/java/com/twitter/aurora/scheduler/storage/log/SnapshotStoreImpl.java
deleted file mode 100644
index df6b899..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/storage/log/SnapshotStoreImpl.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.storage.log;
-
-import java.util.Arrays;
-import java.util.Map;
-import java.util.Properties;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-
-import com.google.common.collect.ImmutableSet;
-
-import com.twitter.aurora.gen.HostAttributes;
-import com.twitter.aurora.gen.Lock;
-import com.twitter.aurora.gen.storage.QuotaConfiguration;
-import com.twitter.aurora.gen.storage.SchedulerMetadata;
-import com.twitter.aurora.gen.storage.Snapshot;
-import com.twitter.aurora.gen.storage.StoredJob;
-import com.twitter.aurora.scheduler.base.Query;
-import com.twitter.aurora.scheduler.storage.SnapshotStore;
-import com.twitter.aurora.scheduler.storage.Storage;
-import com.twitter.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import com.twitter.aurora.scheduler.storage.Storage.MutateWork;
-import com.twitter.aurora.scheduler.storage.Storage.StoreProvider;
-import com.twitter.aurora.scheduler.storage.Storage.Volatile;
-import com.twitter.aurora.scheduler.storage.Storage.Work;
-import com.twitter.aurora.scheduler.storage.entities.IJobConfiguration;
-import com.twitter.aurora.scheduler.storage.entities.ILock;
-import com.twitter.aurora.scheduler.storage.entities.IQuota;
-import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
-import com.twitter.common.inject.TimedInterceptor.Timed;
-import com.twitter.common.util.BuildInfo;
-import com.twitter.common.util.Clock;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import static com.twitter.aurora.gen.apiConstants.CURRENT_API_VERSION;
-
-/**
- * Snapshot store implementation that delegates to underlying snapshot stores by
- * extracting/applying fields in a snapshot thrift struct.
- */
-public class SnapshotStoreImpl implements SnapshotStore<Snapshot> {
-
-  private static final Logger LOG = Logger.getLogger(SnapshotStoreImpl.class.getName());
-
-  private static final SnapshotField ATTRIBUTE_FIELD = new SnapshotField() {
-    @Override public void saveToSnapshot(StoreProvider storeProvider, Snapshot snapshot) {
-      snapshot.setHostAttributes(storeProvider.getAttributeStore().getHostAttributes());
-    }
-
-    @Override public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) {
-      store.getAttributeStore().deleteHostAttributes();
-
-      if (snapshot.isSetHostAttributes()) {
-        for (HostAttributes attributes : snapshot.getHostAttributes()) {
-          store.getAttributeStore().saveHostAttributes(attributes);
-        }
-      }
-    }
-  };
-
-  private static final Iterable<SnapshotField> SNAPSHOT_FIELDS = Arrays.asList(
-      ATTRIBUTE_FIELD,
-      new SnapshotField() {
-        @Override public void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
-          snapshot.setTasks(
-              IScheduledTask.toBuildersSet(store.getTaskStore().fetchTasks(Query.unscoped())));
-        }
-
-        @Override public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) {
-          store.getUnsafeTaskStore().deleteAllTasks();
-
-          if (snapshot.isSetTasks()) {
-            store.getUnsafeTaskStore().saveTasks(
-                IScheduledTask.setFromBuilders(snapshot.getTasks()));
-          }
-        }
-      },
-      new SnapshotField() {
-        @Override public void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
-          ImmutableSet.Builder<StoredJob> jobs = ImmutableSet.builder();
-          for (String managerId : store.getJobStore().fetchManagerIds()) {
-            for (IJobConfiguration config : store.getJobStore().fetchJobs(managerId)) {
-              jobs.add(new StoredJob(managerId, config.newBuilder()));
-            }
-          }
-          snapshot.setJobs(jobs.build());
-        }
-
-        @Override public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) {
-          store.getJobStore().deleteJobs();
-
-          if (snapshot.isSetJobs()) {
-            for (StoredJob job : snapshot.getJobs()) {
-              store.getJobStore().saveAcceptedJob(
-                  job.getJobManagerId(),
-                  IJobConfiguration.build(job.getJobConfiguration()));
-            }
-          }
-        }
-      },
-      new SnapshotField() {
-        @Override public void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
-          Properties props = new BuildInfo().getProperties();
-
-          snapshot.setSchedulerMetadata(
-                new SchedulerMetadata()
-                  .setFrameworkId(store.getSchedulerStore().fetchFrameworkId())
-                  .setRevision(props.getProperty(BuildInfo.Key.GIT_REVISION.value))
-                  .setTag(props.getProperty(BuildInfo.Key.GIT_TAG.value))
-                  .setTimestamp(props.getProperty(BuildInfo.Key.TIMESTAMP.value))
-                  .setUser(props.getProperty(BuildInfo.Key.USER.value))
-                  .setMachine(props.getProperty(BuildInfo.Key.MACHINE.value))
-                  .setVersion(CURRENT_API_VERSION));
-        }
-
-        @Override public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) {
-          if (snapshot.isSetSchedulerMetadata()) {
-            // No delete necessary here since this is a single value.
-
-            store.getSchedulerStore()
-                .saveFrameworkId(snapshot.getSchedulerMetadata().getFrameworkId());
-          }
-        }
-      },
-      new SnapshotField() {
-        @Override public void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
-          ImmutableSet.Builder<QuotaConfiguration> quotas = ImmutableSet.builder();
-          for (Map.Entry<String, IQuota> entry : store.getQuotaStore().fetchQuotas().entrySet()) {
-            quotas.add(new QuotaConfiguration(entry.getKey(), entry.getValue().newBuilder()));
-          }
-
-          snapshot.setQuotaConfigurations(quotas.build());
-        }
-
-        @Override public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) {
-          store.getQuotaStore().deleteQuotas();
-
-          if (snapshot.isSetQuotaConfigurations()) {
-            for (QuotaConfiguration quota : snapshot.getQuotaConfigurations()) {
-              store.getQuotaStore().saveQuota(quota.getRole(), IQuota.build(quota.getQuota()));
-            }
-          }
-        }
-      },
-      new SnapshotField() {
-        @Override public void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
-          snapshot.setLocks(ILock.toBuildersSet(store.getLockStore().fetchLocks()));
-        }
-
-        @Override public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) {
-          store.getLockStore().deleteLocks();
-
-          if (snapshot.isSetLocks()) {
-            for (Lock lock : snapshot.getLocks()) {
-              store.getLockStore().saveLock(ILock.build(lock));
-            }
-          }
-        }
-      }
-  );
-
-  private final Clock clock;
-  private final Storage storage;
-
-  @Inject
-  public SnapshotStoreImpl(Clock clock, @Volatile Storage storage) {
-    this.clock = checkNotNull(clock);
-    this.storage = checkNotNull(storage);
-  }
-
-  @Timed("snapshot_create")
-  @Override public Snapshot createSnapshot() {
-    return storage.consistentRead(new Work.Quiet<Snapshot>() {
-      @Override public Snapshot apply(StoreProvider storeProvider) {
-        Snapshot snapshot = new Snapshot();
-
-        // Capture timestamp to signify the beginning of a snapshot operation, apply after in case
-        // one of the field closures is mean and tries to apply a timestamp.
-        long timestamp = clock.nowMillis();
-        for (SnapshotField field : SNAPSHOT_FIELDS) {
-          field.saveToSnapshot(storeProvider, snapshot);
-        }
-        snapshot.setTimestamp(timestamp);
-        return snapshot;
-      }
-    });
-  }
-
-  @Timed("snapshot_apply")
-  @Override public void applySnapshot(final Snapshot snapshot) {
-    checkNotNull(snapshot);
-
-    storage.write(new MutateWork.NoResult.Quiet() {
-      @Override protected void execute(MutableStoreProvider storeProvider) {
-        LOG.info("Restoring snapshot.");
-
-        for (SnapshotField field : SNAPSHOT_FIELDS) {
-          field.restoreFromSnapshot(storeProvider, snapshot);
-        }
-      }
-    });
-  }
-
-  private interface SnapshotField {
-    void saveToSnapshot(StoreProvider storeProvider, Snapshot snapshot);
-
-    void restoreFromSnapshot(MutableStoreProvider storeProvider, Snapshot snapshot);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/storage/log/testing/LogOpMatcher.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/storage/log/testing/LogOpMatcher.java b/src/main/java/com/twitter/aurora/scheduler/storage/log/testing/LogOpMatcher.java
deleted file mode 100644
index a4c0126..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/storage/log/testing/LogOpMatcher.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.storage.log.testing;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-
-import org.easymock.EasyMock;
-import org.easymock.IArgumentMatcher;
-import org.easymock.IExpectationSetters;
-
-import com.twitter.aurora.codec.ThriftBinaryCodec;
-import com.twitter.aurora.codec.ThriftBinaryCodec.CodingException;
-import com.twitter.aurora.gen.storage.LogEntry;
-import com.twitter.aurora.gen.storage.Op;
-import com.twitter.aurora.gen.storage.Snapshot;
-import com.twitter.aurora.gen.storage.Transaction;
-import com.twitter.aurora.gen.storage.storageConstants;
-import com.twitter.aurora.scheduler.log.Log.Position;
-import com.twitter.aurora.scheduler.log.Log.Stream;
-
-import static org.easymock.EasyMock.expect;
-
-/**
- * A junit argument matcher that detects same-value {@link LogEntry} objects in a more human
- * readable way than byte array comparison.
- */
-public class LogOpMatcher implements IArgumentMatcher {
-  private final LogEntry expected;
-
-  public LogOpMatcher(LogEntry expected) {
-    this.expected = expected;
-  }
-
-  @Override
-  public boolean matches(Object argument) {
-    try {
-      return expected.equals(ThriftBinaryCodec.decodeNonNull(LogEntry.class, (byte[]) argument));
-    } catch (CodingException e) {
-      return false;
-    }
-  }
-
-  @Override
-  public void appendTo(StringBuffer buffer) {
-    buffer.append(expected);
-  }
-
-  /**
-   * Creates a stream matcher that will set expectations on the provided {@code stream}.
-   *
-   * @param stream Mocked stream.
-   * @return A stream matcher to set expectations against {@code stream}.
-   */
-  public static StreamMatcher matcherFor(Stream stream) {
-    return new StreamMatcher(stream);
-  }
-
-  public static final class StreamMatcher {
-    private final Stream stream;
-
-    private StreamMatcher(Stream stream) {
-      this.stream = Preconditions.checkNotNull(stream);
-    }
-
-    /**
-     * Sets an expectation for a stream transaction containing the provided {@code ops}.
-     *
-     * @param ops Operations to expect in the transaction.
-     * @return An expectation setter.
-     */
-    public IExpectationSetters<Position> expectTransaction(Op...ops) {
-      LogEntry entry = LogEntry.transaction(
-          new Transaction(ImmutableList.copyOf(ops), storageConstants.CURRENT_SCHEMA_VERSION));
-      return expect(stream.append(sameEntry(entry)));
-    }
-
-    /**
-     * Sets an expectation for a snapshot.
-     *
-     * @param snapshot Expected snapshot.
-     * @return An expectation setter.
-     */
-    public IExpectationSetters<Position> expectSnapshot(Snapshot snapshot) {
-      LogEntry entry = LogEntry.snapshot(snapshot);
-      return expect(stream.append(sameEntry(entry)));
-    }
-  }
-
-  /**
-   * Creates a matcher that supports value matching between a serialized {@link LogEntry} byte array
-   * and a log entry object.
-   *
-   * @param entry Entry to match against.
-   * @return {@code null}, return value included for easymock-style embedding.
-   */
-  private static byte[] sameEntry(LogEntry entry) {
-    EasyMock.reportMatcher(new LogOpMatcher(entry));
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/storage/mem/Interner.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/storage/mem/Interner.java b/src/main/java/com/twitter/aurora/scheduler/storage/mem/Interner.java
deleted file mode 100644
index bf05caa..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/storage/mem/Interner.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.storage.mem;
-
-import java.util.Map;
-import java.util.Set;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-/**
- * An interning pool that can be used to retrieve the canonical instances of objects, while
- * maintaining a reference count to the canonical instances.
- *
- * @param <T> The interned object type.
- * @param <A> The type used for maintaining associations.
- */
-class Interner<T, A> {
-
-  private final Map<T, InternEntry<A, T>> pool = Maps.newHashMap();
-
-  /**
-   * Retrieves the canonical instance of {@code t} and maintains {@code association} with the
-   * interned value.  If {@code t} was not previously interned, the provided instance is stored.
-   *
-   * @param t The object to intern, or get the previously-interned value for.
-   * @param association A value to associate with {@code t}.
-   * @return The interned value, which may be reference-equivalent to {@code t}.
-   */
-  synchronized T addAssociation(T t, A association) {
-    InternEntry<A, T> entry = pool.get(t);
-    if (entry == null) {
-      entry = new InternEntry<>(t, association);
-      pool.put(t, entry);
-    } else {
-      entry.associations.add(association);
-    }
-    return entry.interned;
-  }
-
-  /**
-   * Removes an association with an interned value, effectively decrementing the reference count.
-   *
-   * @param t The interned value that {@code association} was associated with.
-   * @param association The association to remove.
-   */
-  synchronized void removeAssociation(T t, A association) {
-    InternEntry<A, T> entry = pool.get(t);
-    if (entry != null) {
-      entry.associations.remove(association);
-      if (entry.associations.isEmpty()) {
-        pool.remove(t);
-      }
-    }
-  }
-
-  /**
-   * Removes all interned values and associations.
-   */
-  synchronized void clear() {
-    pool.clear();
-  }
-
-  @VisibleForTesting
-  synchronized boolean isInterned(T t) {
-    return pool.containsKey(t);
-  }
-
-  @VisibleForTesting
-  synchronized Set<A> getAssociations(T t) {
-    return ImmutableSet.copyOf(pool.get(t).associations);
-  }
-
-  private static class InternEntry<A, T> {
-    private final T interned;
-    private final Set<A> associations = Sets.newHashSet();
-
-    InternEntry(T interned, A initialAssociation) {
-      this.interned = interned;
-      associations.add(initialAssociation);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemAttributeStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemAttributeStore.java b/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemAttributeStore.java
deleted file mode 100644
index 6c383c7..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemAttributeStore.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.storage.mem;
-
-import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
-
-import com.twitter.aurora.gen.Attribute;
-import com.twitter.aurora.gen.HostAttributes;
-import com.twitter.aurora.gen.MaintenanceMode;
-import com.twitter.aurora.scheduler.storage.AttributeStore.Mutable;
-
-/**
- * An in-memory attribute store.
- */
-class MemAttributeStore implements Mutable {
-  private final ConcurrentMap<String, HostAttributes> hostAttributes = Maps.newConcurrentMap();
-
-  @Override
-  public void deleteHostAttributes() {
-    hostAttributes.clear();
-  }
-
-  @Override
-  public void saveHostAttributes(HostAttributes attributes) {
-    hostAttributes.putIfAbsent(attributes.getHost(), attributes);
-
-    HostAttributes stored = hostAttributes.get(attributes.getHost());
-    if (!stored.isSetMode()) {
-      stored.setMode(attributes.isSetMode() ? attributes.getMode() : MaintenanceMode.NONE);
-    }
-    stored.setAttributes(attributes.isSetAttributes()
-        ? attributes.getAttributes() : ImmutableSet.<Attribute>of());
-  }
-
-  @Override
-  public boolean setMaintenanceMode(String host, MaintenanceMode mode) {
-    HostAttributes stored = hostAttributes.get(host);
-    if (stored != null) {
-      stored.setMode(mode);
-      return true;
-    } else {
-      return false;
-    }
-  }
-
-  @Override
-  public Optional<HostAttributes> getHostAttributes(String host) {
-    return Optional.fromNullable(hostAttributes.get(host));
-  }
-
-  @Override
-  public Set<HostAttributes> getHostAttributes() {
-    return ImmutableSet.copyOf(hostAttributes.values());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemJobStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemJobStore.java b/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemJobStore.java
deleted file mode 100644
index 8268e8d..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemJobStore.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.storage.mem;
-
-import java.util.Map;
-import java.util.Set;
-
-import javax.annotation.Nullable;
-
-import com.google.common.base.Optional;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
-
-import com.twitter.aurora.scheduler.base.JobKeys;
-import com.twitter.aurora.scheduler.storage.JobStore;
-import com.twitter.aurora.scheduler.storage.entities.IJobConfiguration;
-import com.twitter.aurora.scheduler.storage.entities.IJobKey;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * An in-memory job store.
- */
-class MemJobStore implements JobStore.Mutable {
-
-  private final LoadingCache<String, Manager> managers = CacheBuilder.newBuilder()
-      .build(new CacheLoader<String, Manager>() {
-        @Override public Manager load(String key) {
-          return new Manager();
-        }
-      });
-
-  @Override
-  public void saveAcceptedJob(String managerId, IJobConfiguration jobConfig) {
-    checkNotNull(managerId);
-    checkNotNull(jobConfig);
-
-    IJobKey key = JobKeys.assertValid(jobConfig.getKey());
-    managers.getUnchecked(managerId).jobs.put(key, jobConfig);
-  }
-
-  @Override
-  public void removeJob(IJobKey jobKey) {
-    checkNotNull(jobKey);
-
-    for (Manager manager : managers.asMap().values()) {
-      manager.jobs.remove(jobKey);
-    }
-  }
-
-  @Override
-  public void deleteJobs() {
-    managers.invalidateAll();
-  }
-
-  @Override
-  public Iterable<IJobConfiguration> fetchJobs(String managerId) {
-    checkNotNull(managerId);
-
-    @Nullable Manager manager = managers.getIfPresent(managerId);
-    if (manager == null) {
-      return ImmutableSet.of();
-    }
-
-    synchronized (manager.jobs) {
-      return ImmutableSet.copyOf(manager.jobs.values());
-    }
-  }
-
-  @Override
-  public Optional<IJobConfiguration> fetchJob(String managerId, IJobKey jobKey) {
-    checkNotNull(managerId);
-    checkNotNull(jobKey);
-
-    Optional<Manager> manager = Optional.fromNullable(managers.getIfPresent(managerId));
-    if (!manager.isPresent()) {
-      return Optional.absent();
-    } else {
-      return Optional.fromNullable(manager.get().jobs.get(jobKey));
-    }
-  }
-
-  @Override
-  public Set<String> fetchManagerIds() {
-    return ImmutableSet.copyOf(managers.asMap().keySet());
-  }
-
-  private static class Manager {
-    private final Map<IJobKey, IJobConfiguration> jobs = Maps.newConcurrentMap();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemLockStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemLockStore.java b/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemLockStore.java
deleted file mode 100644
index 1ced973..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemLockStore.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.storage.mem;
-
-import java.util.Map;
-import java.util.Set;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
-
-import com.twitter.aurora.scheduler.storage.LockStore;
-import com.twitter.aurora.scheduler.storage.entities.ILock;
-import com.twitter.aurora.scheduler.storage.entities.ILockKey;
-
-/**
- * An in-memory lock store.
- */
-class MemLockStore implements LockStore.Mutable {
-
-  private final Map<ILockKey, ILock> locks = Maps.newConcurrentMap();
-
-  @Override
-  public void saveLock(ILock lock) {
-    locks.put(lock.getKey(), lock);
-  }
-
-  @Override
-  public void removeLock(ILockKey lockKey) {
-    locks.remove(lockKey);
-  }
-
-  @Override
-  public void deleteLocks() {
-    locks.clear();
-  }
-
-  @Override
-  public Set<ILock> fetchLocks() {
-    return ImmutableSet.copyOf(locks.values());
-  }
-
-  @Override
-  public Optional<ILock> fetchLock(ILockKey lockKey) {
-    return Optional.fromNullable(locks.get(lockKey));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemQuotaStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemQuotaStore.java b/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemQuotaStore.java
deleted file mode 100644
index 855b39c..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemQuotaStore.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.storage.mem;
-
-import java.util.Map;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-
-import com.twitter.aurora.scheduler.storage.QuotaStore;
-import com.twitter.aurora.scheduler.storage.entities.IQuota;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * An in-memory quota store.
- */
-class MemQuotaStore implements QuotaStore.Mutable {
-
-  private final Map<String, IQuota> quotas = Maps.newConcurrentMap();
-
-  @Override
-  public void deleteQuotas() {
-    quotas.clear();
-  }
-
-  @Override
-  public void removeQuota(String role) {
-    checkNotNull(role);
-
-    quotas.remove(role);
-  }
-
-  @Override
-  public void saveQuota(String role, IQuota quota) {
-    checkNotNull(role);
-    checkNotNull(quota);
-
-    quotas.put(role, quota);
-  }
-
-  @Override
-  public Optional<IQuota> fetchQuota(String role) {
-    checkNotNull(role);
-    return Optional.fromNullable(quotas.get(role));
-  }
-
-  @Override
-  public Map<String, IQuota> fetchQuotas() {
-    return ImmutableMap.copyOf(quotas);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemSchedulerStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemSchedulerStore.java b/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemSchedulerStore.java
deleted file mode 100644
index 19bf4ec..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/storage/mem/MemSchedulerStore.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.storage.mem;
-
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.annotation.Nullable;
-
-import com.google.common.util.concurrent.Atomics;
-
-import com.twitter.aurora.scheduler.storage.SchedulerStore;
-
-/**
- * An in-memory scheduler store.
- */
-class MemSchedulerStore implements SchedulerStore.Mutable {
-  private final AtomicReference<String> frameworkId = Atomics.newReference();
-
-  @Override
-  public void saveFrameworkId(String newFrameworkId) {
-    frameworkId.set(newFrameworkId);
-  }
-
-  @Nullable
-  @Override
-  public String fetchFrameworkId() {
-    return frameworkId.get();
-  }
-}