You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ke...@apache.org on 2014/10/16 02:06:36 UTC

git commit: Add a flag to deduplicate storage snapshots

Repository: incubator-aurora
Updated Branches:
  refs/heads/master 0ee44a533 -> 253cb7370


Add a flag to deduplicate storage snapshots

Add a new format for deduplicated storage snapshots. Microbenchmarks
show a 10x deduplication ratio on Twitter's production snapshots.

This format is backwards-incompatible, so this patch introduces a
flag to control its use (defaulting off).

This only changes the format used to write to the replicated log (where
time is of the essence since all writes are done holding the global
storage lock) - the format of backups written to disk is unchanged,
as backups don't hold the lock.

Testing Done:
./gradlew -Pq build

Bugs closed: AURORA-722

Reviewed at https://reviews.apache.org/r/26478/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/253cb737
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/253cb737
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/253cb737

Branch: refs/heads/master
Commit: 253cb7370d6055c5b931a256f292b86973cbacfd
Parents: 0ee44a5
Author: Kevin Sweeney <ke...@apache.org>
Authored: Wed Oct 15 17:05:45 2014 -0700
Committer: Kevin Sweeney <ke...@apache.org>
Committed: Wed Oct 15 17:05:45 2014 -0700

----------------------------------------------------------------------
 config/legacy_untested_classes.txt              |   3 +-
 docs/deploying-aurora-scheduler.md              |   4 +
 docs/scheduler-storage.md                       |  47 ++++++
 .../apache/aurora/codec/ThriftBinaryCodec.java  |   7 +-
 .../scheduler/storage/log/EntrySerializer.java  |   6 +-
 .../scheduler/storage/log/LogManager.java       |  16 +-
 .../scheduler/storage/log/LogStorage.java       |   3 +
 .../scheduler/storage/log/LogStorageModule.java |  33 ++--
 .../storage/log/SnapshotDeduplicator.java       | 151 +++++++++++++++++++
 .../scheduler/storage/log/StreamManager.java    |   2 +-
 .../storage/log/StreamManagerImpl.java          |  36 +++--
 .../thrift/org/apache/aurora/gen/storage.thrift |  31 ++++
 .../scheduler/storage/log/LogManagerTest.java   |  13 +-
 .../scheduler/storage/log/LogStorageTest.java   |   6 +-
 .../log/SnapshotDeduplicatorImplTest.java       | 131 ++++++++++++++++
 15 files changed, 445 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/253cb737/config/legacy_untested_classes.txt
----------------------------------------------------------------------
diff --git a/config/legacy_untested_classes.txt b/config/legacy_untested_classes.txt
index 3af9986..542ac31 100644
--- a/config/legacy_untested_classes.txt
+++ b/config/legacy_untested_classes.txt
@@ -2,7 +2,6 @@ org/apache/aurora/auth/UnsecureAuthModule$UnsecureCapabilityValidator$1
 org/apache/aurora/auth/UnsecureAuthModule$UnsecureCapabilityValidator$2
 org/apache/aurora/auth/UnsecureAuthModule$UnsecureSessionValidator
 org/apache/aurora/auth/UnsecureAuthModule$UnsecureSessionValidator$1
-org/apache/aurora/codec/ThriftBinaryCodec$CodingException
 org/apache/aurora/Protobufs$1
 org/apache/aurora/scheduler/app/SchedulerMain$2
 org/apache/aurora/scheduler/app/SchedulerMain$3
@@ -74,4 +73,4 @@ org/apache/aurora/scheduler/storage/mem/Util$1
 org/apache/aurora/scheduler/thrift/aop/AopModule$2$1
 org/apache/aurora/scheduler/thrift/aop/AopModule$2$2
 org/apache/aurora/scheduler/thrift/auth/ThriftAuthModule$1
-org/apache/aurora/scheduler/updater/UpdateConfigurationException
\ No newline at end of file
+org/apache/aurora/scheduler/updater/UpdateConfigurationException

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/253cb737/docs/deploying-aurora-scheduler.md
----------------------------------------------------------------------
diff --git a/docs/deploying-aurora-scheduler.md b/docs/deploying-aurora-scheduler.md
index 20f5f38..380577e 100644
--- a/docs/deploying-aurora-scheduler.md
+++ b/docs/deploying-aurora-scheduler.md
@@ -92,6 +92,10 @@ Failing to do this will result the following message when you try to start the s
 
     Replica in EMPTY status received a broadcasted recover request
 
+## Storage Performance Considerations
+
+See [this document](scheduler-storage.md) for scheduler storage performance considerations.
+
 ## Network considerations
 The Aurora scheduler listens on 2 ports - an HTTP port used for client RPCs and a web UI,
 and a libprocess (HTTP+Protobuf) port used to communicate with the Mesos master and for the log

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/253cb737/docs/scheduler-storage.md
----------------------------------------------------------------------
diff --git a/docs/scheduler-storage.md b/docs/scheduler-storage.md
new file mode 100644
index 0000000..1cd02f8
--- /dev/null
+++ b/docs/scheduler-storage.md
@@ -0,0 +1,47 @@
+# Snapshot Performance
+
+Periodically the scheduler writes a full snapshot of its state to the replicated log. To do this
+it needs to hold a global storage write lock while it writes out this data. In large clusters
+this has been observed to take up to 40 seconds. Long pauses can cause issues in the system,
+including delays in scheduling new tasks.
+
+The scheduler has two optimizations to reduce the size of snapshots and thus improve snapshot
+performance: compression and deduplication. Most users will want to enable both compression
+and deduplication.
+
+## Compression
+
+To reduce the size of the snapshot the DEFLATE algorithm can be applied to the serialized bytes
+of the snapshot as they are written to the stream. This reduces the total number of bytes that
+need to be written to the replicated log at the cost of CPU and generally reduces the amount
+of time a snapshot takes. Most users will want to enable both compression and deduplication.
+
+### Enabling Compression
+
+Snapshot compression is enabled via the `-deflate_snapshots` flag. This is the default since
+Aurora 0.5.0. All released versions of Aurora can read both compressed and uncompressed snapshots,
+so there are no backwards compatibility concerns associated with changing this flag.
+
+### Disabling compression
+
+Disable compression by passing `-deflate_snapshots=false`.
+
+## Deduplication
+
+In Aurora 0.6.0 a new snapshot format was introduced. Rather than write one configuration blob
+per Mesos task this format stores each configuration blob once, and each Mesos task with a
+pointer to its blob. This format is not backwards compatible with earlier versions of Aurora.
+
+### Enabling Deduplication
+
+After upgrading Aurora to 0.6.0, enable deduplication with the `-deduplicate_snapshots` flag.
+After the first snapshot the cluster will be using the deduplicated format to write to the
+replicated log. Snapshots are created periodically by the scheduler (according to
+the `-dlog_snapshot_interval` flag). An administrator can also force a snapshot operation with
+`aurora_admin snapshot`.
+
+### Disabling Deduplication
+
+To disable deduplication, for example to rollback to Aurora, restart all of the cluster's
+schedulers with `-deduplicate_snapshots=false` and either wait for a snapshot or force one
+using `aurora_admin snapshot`.

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/253cb737/src/main/java/org/apache/aurora/codec/ThriftBinaryCodec.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/codec/ThriftBinaryCodec.java b/src/main/java/org/apache/aurora/codec/ThriftBinaryCodec.java
index 65e986e..32a8a81 100644
--- a/src/main/java/org/apache/aurora/codec/ThriftBinaryCodec.java
+++ b/src/main/java/org/apache/aurora/codec/ThriftBinaryCodec.java
@@ -131,6 +131,11 @@ public final class ThriftBinaryCodec {
   // "If the memory is available, buffers sizes on the order of 128K or 256K bytes should be used."
   private static final int DEFLATER_BUFFER_SIZE = Amount.of(256, Data.KB).as(Data.BYTES);
 
+  // Empirical from microbenchmarks (assuming 20MiB/s writes to the replicated log and a large
+  // de-duplicated Snapshot from a production environment).
+  // TODO(ksweeney): Consider making this configurable.
+  private static final int DEFLATE_LEVEL = 3;
+
   /**
    * Encodes a thrift object into a DEFLATE-compressed binary array.
    *
@@ -151,7 +156,7 @@ public final class ThriftBinaryCodec {
       // See http://bugs.java.com/bugdatabase/view_bug.do?bug_id=4986239
       TTransport transport = new TIOStreamTransport(
           new BufferedOutputStream(
-              new DeflaterOutputStream(outBytes, new Deflater(), DEFLATER_BUFFER_SIZE),
+              new DeflaterOutputStream(outBytes, new Deflater(DEFLATE_LEVEL), DEFLATER_BUFFER_SIZE),
               DEFLATER_BUFFER_SIZE));
       TProtocol protocol = PROTOCOL_FACTORY.getProtocol(transport);
       tBase.write(protocol);

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/253cb737/src/main/java/org/apache/aurora/scheduler/storage/log/EntrySerializer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/EntrySerializer.java b/src/main/java/org/apache/aurora/scheduler/storage/log/EntrySerializer.java
index 7239a6a..cbb711f 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/EntrySerializer.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/EntrySerializer.java
@@ -82,12 +82,14 @@ public interface EntrySerializer {
       return frames;
     }
 
-    private byte[] checksum(byte[] data) {
+    @Timed("log_entry_checksum")
+    protected byte[] checksum(byte[] data) {
       // TODO(ksweeney): Use the streaming API here.
       return hashFunction.hashBytes(data).asBytes();
     }
 
-    private static byte[] encode(Frame frame) throws CodingException {
+    @Timed("log_entry_encode")
+    protected byte[] encode(Frame frame) throws CodingException {
       return Entries.thriftBinaryEncode(LogEntry.frame(frame));
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/253cb737/src/main/java/org/apache/aurora/scheduler/storage/log/LogManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogManager.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogManager.java
index 4b50e20..4099503 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/LogManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogManager.java
@@ -34,23 +34,31 @@ public class LogManager {
    * Identifies the maximum log entry size to permit before chunking entries into frames.
    */
   @Retention(RetentionPolicy.RUNTIME)
-  @Target({ ElementType.PARAMETER, ElementType.METHOD })
+  @Target({ElementType.METHOD, ElementType.PARAMETER})
   @Qualifier
   public @interface MaxEntrySize { }
 
   /**
-   * Binding annotation for settings regarding the way snapshots are written.
+   * When true, enable snapshot deflation.
    */
   @Retention(RetentionPolicy.RUNTIME)
-  @Target({ ElementType.PARAMETER, ElementType.METHOD })
+  @Target({ElementType.METHOD, ElementType.PARAMETER})
   @Qualifier
   public @interface DeflateSnapshots { }
 
   /**
+   * When true, enable snapshot deduplication.
+   */
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target({ElementType.METHOD, ElementType.PARAMETER})
+  @Qualifier
+  public @interface DeduplicateSnapshots { }
+
+  /**
    * Hash function used to verify log entries.
    */
   @Retention(RetentionPolicy.RUNTIME)
-  @Target({ElementType.PARAMETER, ElementType.METHOD})
+  @Target({ElementType.METHOD, ElementType.PARAMETER})
   @Qualifier
   public @interface LogEntryHashFunction { }
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/253cb737/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
index f806297..ec9ccfd 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
@@ -389,6 +389,9 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
       case FRAME:
         throw new IllegalArgumentException("Framed entries are not handled at this layer.");
 
+      case DEDUPLICATED_SNAPSHOT:
+        throw new IllegalArgumentException("Deduplicated snapshots are not handled at this layer.");
+
       default:
         throw new IllegalStateException("Unknown log entry type: " + logEntry);
     }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/253cb737/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java
index 769348e..73348f3 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java
@@ -24,33 +24,26 @@ import com.google.inject.AbstractModule;
 import com.google.inject.Key;
 import com.google.inject.TypeLiteral;
 import com.google.inject.assistedinject.FactoryModuleBuilder;
-import com.twitter.common.application.ShutdownRegistry;
 import com.twitter.common.args.Arg;
 import com.twitter.common.args.CmdLine;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Data;
 import com.twitter.common.quantity.Time;
-import com.twitter.common.util.Clock;
 
-import org.apache.aurora.scheduler.log.Log;
 import org.apache.aurora.scheduler.storage.CallOrderEnforcingStorage;
 import org.apache.aurora.scheduler.storage.DistributedSnapshotStore;
+import org.apache.aurora.scheduler.storage.log.LogManager.DeduplicateSnapshots;
 import org.apache.aurora.scheduler.storage.log.LogManager.MaxEntrySize;
 import org.apache.aurora.scheduler.storage.log.LogStorage.ShutdownGracePeriod;
 import org.apache.aurora.scheduler.storage.log.LogStorage.SnapshotInterval;
 
 import static org.apache.aurora.scheduler.storage.log.EntrySerializer.EntrySerializerImpl;
+import static org.apache.aurora.scheduler.storage.log.LogManager.DeflateSnapshots;
+import static org.apache.aurora.scheduler.storage.log.LogManager.LogEntryHashFunction;
+import static org.apache.aurora.scheduler.storage.log.SnapshotDeduplicator.SnapshotDeduplicatorImpl;
 
 /**
  * Bindings for scheduler distributed log based storage.
- * <p/>
- * Requires bindings for:
- * <ul>
- *   <li>{@link Clock}</li>
- *   <li>{@link ShutdownRegistry}</li>
- *   <li>The concrete {@link Log} implementation.</li>
- * </ul>
- * <p/>
  */
 public class LogStorageModule extends AbstractModule {
 
@@ -73,22 +66,24 @@ public class LogStorageModule extends AbstractModule {
   public static final Arg<Amount<Integer, Data>> MAX_LOG_ENTRY_SIZE =
       Arg.create(Amount.of(512, Data.KB));
 
+  @CmdLine(name = "deduplicate_snapshots",
+      help = "Write snapshots in deduplicated format. For details and backwards compatibility "
+          + "concerns see docs/scheduler-storage.md.")
+  private static final Arg<Boolean> DEDUPLICATE_SNAPSHOTS = Arg.create(false);
+
   @CmdLine(name = "deflate_snapshots", help = "Whether snapshots should be deflate-compressed.")
   private static final Arg<Boolean> DEFLATE_SNAPSHOTS = Arg.create(true);
 
   @Override
   protected void configure() {
-    requireBinding(Log.class);
-    requireBinding(Clock.class);
-    requireBinding(ShutdownRegistry.class);
-
     bindInterval(ShutdownGracePeriod.class, SHUTDOWN_GRACE_PERIOD);
     bindInterval(SnapshotInterval.class, SNAPSHOT_INTERVAL);
 
     bind(new TypeLiteral<Amount<Integer, Data>>() { }).annotatedWith(MaxEntrySize.class)
         .toInstance(MAX_LOG_ENTRY_SIZE.get());
     bind(LogManager.class).in(Singleton.class);
-    bindConstant().annotatedWith(LogManager.DeflateSnapshots.class).to(DEFLATE_SNAPSHOTS.get());
+    bindConstant().annotatedWith(DeduplicateSnapshots.class).to(DEDUPLICATE_SNAPSHOTS.get());
+    bindConstant().annotatedWith(DeflateSnapshots.class).to(DEFLATE_SNAPSHOTS.get());
     bind(LogStorage.class).in(Singleton.class);
 
     install(CallOrderEnforcingStorage.wrappingModule(LogStorage.class));
@@ -97,13 +92,13 @@ public class LogStorageModule extends AbstractModule {
     bind(EntrySerializer.class).to(EntrySerializerImpl.class);
     // TODO(ksweeney): We don't need a cryptographic checksum here - assess performance of MD5
     // versus a faster error-detection checksum like CRC32 for large Snapshots.
-    bind(HashFunction.class).annotatedWith(LogManager.LogEntryHashFunction.class)
-        .toInstance(Hashing.md5());
+    bind(HashFunction.class).annotatedWith(LogEntryHashFunction.class).toInstance(Hashing.md5());
+
+    bind(SnapshotDeduplicator.class).to(SnapshotDeduplicatorImpl.class);
 
     install(new FactoryModuleBuilder()
         .implement(StreamManager.class, StreamManagerImpl.class)
         .build(StreamManagerFactory.class));
-
   }
 
   private void bindInterval(Class<? extends Annotation> key, Arg<Amount<Long, Time>> value) {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/253cb737/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotDeduplicator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotDeduplicator.java b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotDeduplicator.java
new file mode 100644
index 0000000..7b46740
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotDeduplicator.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.log;
+
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.logging.Logger;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Multimaps;
+import com.twitter.common.inject.TimedInterceptor.Timed;
+
+import org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.gen.storage.DeduplicatedScheduledTask;
+import org.apache.aurora.gen.storage.DeduplicatedSnapshot;
+import org.apache.aurora.gen.storage.Snapshot;
+
+/**
+ * Converter between denormalized storage Snapshots and de-duplicated snapshots.
+ *
+ * <p>
+ * For information on the difference in the two formats see the documentation in storage.thrift.
+ */
+public interface SnapshotDeduplicator {
+  /**
+   * Convert a Snapshot to the deduplicated format.
+   *
+   * @param snapshot Snapshot to convert.
+   * @return deduplicated snapshot.
+   */
+  DeduplicatedSnapshot deduplicate(Snapshot snapshot);
+
+  /**
+   * Restore a deduplicated snapshot to its original denormalized form.
+   *
+   * @param snapshot Deduplicated snapshot to restore.
+   * @return A full snapshot.
+   * @throws CodingException when the input data is corrupt.
+   */
+  Snapshot reduplicate(DeduplicatedSnapshot snapshot) throws CodingException;
+
+  class SnapshotDeduplicatorImpl implements SnapshotDeduplicator {
+    private static final Logger LOG = Logger.getLogger(SnapshotDeduplicatorImpl.class.getName());
+
+    private static final Function<ScheduledTask, TaskConfig> SCHEDULED_TO_CONFIG =
+        new Function<ScheduledTask, TaskConfig>() {
+          @Override
+          public TaskConfig apply(ScheduledTask task) {
+            return task.getAssignedTask().getTask();
+          }
+        };
+
+    private static ScheduledTask deepCopyWithoutTaskConfig(ScheduledTask scheduledTask) {
+      ScheduledTask task = scheduledTask.deepCopy();
+      task.getAssignedTask().unsetTask();
+      return task;
+    }
+
+    @Override
+    @Timed("snapshot_deduplicate")
+    public DeduplicatedSnapshot deduplicate(Snapshot snapshot) {
+      int numInputTasks = snapshot.getTasksSize();
+      LOG.info(String.format("Starting deduplication of a snapshot with %d tasks.", numInputTasks));
+
+      Snapshot partialSnapshot = snapshot.deepCopy();
+      partialSnapshot.unsetTasks();
+
+      DeduplicatedSnapshot deduplicatedSnapshot = new DeduplicatedSnapshot()
+          .setPartialSnapshot(partialSnapshot);
+
+      // Nothing to do if we don't have any input tasks.
+      if (!snapshot.isSetTasks()) {
+        LOG.warning("Got snapshot with unset tasks field.");
+        return deduplicatedSnapshot;
+      }
+
+      // Match each unique TaskConfig to its hopefully-multiple ScheduledTask owners.
+      ListMultimap<TaskConfig, ScheduledTask> index = Multimaps.index(
+          snapshot.getTasks(),
+          SCHEDULED_TO_CONFIG);
+
+      for (Entry<TaskConfig, List<ScheduledTask>> entry : Multimaps.asMap(index).entrySet()) {
+        deduplicatedSnapshot.addToTaskConfigs(entry.getKey());
+        for (ScheduledTask scheduledTask : entry.getValue()) {
+          deduplicatedSnapshot.addToPartialTasks(new DeduplicatedScheduledTask()
+              .setPartialScheduledTask(deepCopyWithoutTaskConfig(scheduledTask))
+              .setTaskConfigId(deduplicatedSnapshot.getTaskConfigsSize() - 1));
+        }
+      }
+
+      int numOutputTasks = deduplicatedSnapshot.getTaskConfigsSize();
+
+      LOG.info(String.format(
+          "Finished deduplicating snapshot. Deduplication ratio: %d/%d = %.2f%%.",
+          numInputTasks,
+          numOutputTasks,
+          100.0 * numInputTasks / numOutputTasks));
+
+      return deduplicatedSnapshot;
+    }
+
+    @Override
+    @Timed("snapshot_reduplicate")
+    public Snapshot reduplicate(DeduplicatedSnapshot deduplicatedSnapshot) throws CodingException {
+      LOG.info("Starting reduplication.");
+      Snapshot snapshot = new Snapshot(deduplicatedSnapshot.getPartialSnapshot());
+      if (!deduplicatedSnapshot.isSetTaskConfigs()) {
+        LOG.warning("Got deduplicated snapshot with unset task configs.");
+        return snapshot;
+      }
+
+      for (DeduplicatedScheduledTask partialTask : deduplicatedSnapshot.getPartialTasks()) {
+        ScheduledTask scheduledTask = new ScheduledTask(partialTask.getPartialScheduledTask());
+        int taskConfigId = partialTask.getTaskConfigId();
+        TaskConfig config;
+        try {
+          config = deduplicatedSnapshot.getTaskConfigs().get(taskConfigId);
+        } catch (IndexOutOfBoundsException e) {
+          throw new CodingException(
+              "DeduplicatedScheduledTask referenced invalid task index " + taskConfigId, e);
+        }
+        scheduledTask.getAssignedTask().setTask(config);
+        snapshot.addToTasks(scheduledTask);
+      }
+
+      int numInputTasks = deduplicatedSnapshot.getTaskConfigsSize();
+      int numOutputTasks = snapshot.getTasksSize();
+      LOG.info(String.format(
+          "Finished reduplicating snapshot. Compression ratio: %d/%d = %.2f%%.",
+          numInputTasks,
+          numOutputTasks,
+          100.0 * numInputTasks / numOutputTasks));
+
+      return snapshot;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/253cb737/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManager.java b/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManager.java
index 22db80e..9047682 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManager.java
@@ -30,7 +30,7 @@ import static org.apache.aurora.scheduler.log.Log.Stream.StreamAccessException;
  * operations can be committed atomically, or the log can be compacted by
  * {@link #snapshot(org.apache.aurora.gen.storage.Snapshot) snapshotting}.
  */
-interface StreamManager {
+public interface StreamManager {
   /**
    * Reads all entries in the log stream after the given position.  If the position
    * supplied is {@code null} then all log entries in the stream will be read.

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/253cb737/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java
index e5cfbf5..855573e 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java
@@ -23,7 +23,6 @@ import java.util.logging.Logger;
 import javax.annotation.Nullable;
 import javax.inject.Inject;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
@@ -47,6 +46,7 @@ import org.apache.aurora.gen.storage.Snapshot;
 import org.apache.aurora.gen.storage.Transaction;
 import org.apache.aurora.gen.storage.storageConstants;
 import org.apache.aurora.scheduler.log.Log;
+import org.apache.aurora.scheduler.log.Log.Stream;
 
 import static java.util.Objects.requireNonNull;
 
@@ -55,10 +55,10 @@ import static com.twitter.common.inject.TimedInterceptor.Timed;
 import static org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
 import static org.apache.aurora.scheduler.log.Log.Stream.InvalidPositionException;
 import static org.apache.aurora.scheduler.log.Log.Stream.StreamAccessException;
+import static org.apache.aurora.scheduler.storage.log.LogManager.DeduplicateSnapshots;
 import static org.apache.aurora.scheduler.storage.log.LogManager.DeflateSnapshots;
 import static org.apache.aurora.scheduler.storage.log.LogManager.LogEntryHashFunction;
 
-@VisibleForTesting
 class StreamManagerImpl implements StreamManager {
   private static final Logger LOG = Logger.getLogger(StreamManagerImpl.class.getName());
 
@@ -81,18 +81,24 @@ class StreamManagerImpl implements StreamManager {
   private final EntrySerializer entrySerializer;
   private final boolean deflateSnapshots;
   private final HashFunction hashFunction;
+  private final SnapshotDeduplicator snapshotDeduplicator;
+  private final boolean deduplicateSnapshots;
 
   @Inject
   StreamManagerImpl(
-      @Assisted Log.Stream stream,
+      @Assisted Stream stream,
       EntrySerializer entrySerializer,
       @DeflateSnapshots boolean deflateSnapshots,
-      @LogEntryHashFunction HashFunction hashFunction) {
+      @LogEntryHashFunction HashFunction hashFunction,
+      SnapshotDeduplicator snapshotDeduplicator,
+      @DeduplicateSnapshots boolean deduplicateSnapshots) {
 
     this.stream = requireNonNull(stream);
     this.entrySerializer = requireNonNull(entrySerializer);
     this.deflateSnapshots = deflateSnapshots;
     this.hashFunction = requireNonNull(hashFunction);
+    this.snapshotDeduplicator = requireNonNull(snapshotDeduplicator);
+    this.deduplicateSnapshots = deduplicateSnapshots;
   }
 
   @Override
@@ -112,6 +118,11 @@ class StreamManagerImpl implements StreamManager {
           vars.deflatedEntriesRead.incrementAndGet();
         }
 
+        if (logEntry.isSetDeduplicatedSnapshot()) {
+          logEntry = LogEntry.snapshot(
+              snapshotDeduplicator.reduplicate(logEntry.getDeduplicatedSnapshot()));
+        }
+
         reader.execute(logEntry);
         vars.entriesRead.incrementAndGet();
       }
@@ -186,18 +197,23 @@ class StreamManagerImpl implements StreamManager {
   public StreamTransactionImpl startTransaction() {
     return new StreamTransactionImpl();
   }
+
   @Override
   @Timed("log_manager_snapshot")
   public void snapshot(Snapshot snapshot)
       throws CodingException, InvalidPositionException, StreamAccessException {
 
     LogEntry entry;
-    if (deflateSnapshots) {
-      entry = deflate(snapshot);
+    if (deduplicateSnapshots) {
+      entry = LogEntry.deduplicatedSnapshot(snapshotDeduplicator.deduplicate(snapshot));
     } else {
       entry = LogEntry.snapshot(snapshot);
     }
 
+    if (deflateSnapshots) {
+      entry = deflate(entry);
+    }
+
     Log.Position position = appendAndGetPosition(entry);
     vars.snapshots.incrementAndGet();
     vars.unSnapshottedTransactions.set(0);
@@ -206,17 +222,15 @@ class StreamManagerImpl implements StreamManager {
 
   // Not meant to be subclassed, but timed methods must be non-private.
   // See https://github.com/google/guice/wiki/AOP#limitations
-  @VisibleForTesting
   @Timed("log_manager_deflate")
-  LogEntry deflate(Snapshot snapshot) throws CodingException {
-    return Entries.deflate(LogEntry.snapshot(snapshot));
+  protected LogEntry deflate(LogEntry entry) throws CodingException {
+    return Entries.deflate(entry);
   }
 
   // Not meant to be subclassed, but timed methods must be non-private.
   // See https://github.com/google/guice/wiki/AOP#limitations
-  @VisibleForTesting
   @Timed("log_manager_append")
-  Log.Position appendAndGetPosition(LogEntry logEntry) throws CodingException {
+  protected Log.Position appendAndGetPosition(LogEntry logEntry) throws CodingException {
     Log.Position firstPosition = null;
     byte[][] entries = entrySerializer.serialize(logEntry);
     synchronized (writeMutex) { // ensure all sub-entries are written as a unit

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/253cb737/src/main/thrift/org/apache/aurora/gen/storage.thrift
----------------------------------------------------------------------
diff --git a/src/main/thrift/org/apache/aurora/gen/storage.thrift b/src/main/thrift/org/apache/aurora/gen/storage.thrift
index 5350ec9..3798797 100644
--- a/src/main/thrift/org/apache/aurora/gen/storage.thrift
+++ b/src/main/thrift/org/apache/aurora/gen/storage.thrift
@@ -198,16 +198,43 @@ union Frame {
   2: FrameChunk chunk
 }
 
+// A ScheduledTask with its assignedTask.task field set to null. Deserializers must fill in
+// assignedTask.task with the TaskConfig identified by taskConfigId (which is an index into the
+// DeduplicatedSnapshot's taskConfigs list).
+struct DeduplicatedScheduledTask {
+  1: api.ScheduledTask partialScheduledTask
+  2: i32 taskConfigId
+}
+
+// A Snapshot that has had duplicate TaskConfig structs removed to save space. The
+// partialSnapshot field is a normal Snapshot with the tasks field set to null. To create the
+// full Snapshot deserializers must fill in this field with the result of recreating each
+// partial task using the referenced entry in taskConfigs.
+struct DeduplicatedSnapshot {
+   // Snapshot with its tasks field unset.
+   1: Snapshot partialSnapshot
+   // ScheduledTasks that have had their assignedTask.task field replaced with an ID to save space.
+   2: list<DeduplicatedScheduledTask> partialTasks
+   // Ordered list of taskConfigs. The taskConfigId field of DeduplicatedScheduledTask is an index
+   // into this.
+   3: list<api.TaskConfig> taskConfigs
+}
+
 // A scheduler storage write-ahead log entry consisting of no-ops to skip over or else snapshots or
 // transactions to apply.  Any entry type can also be chopped up into frames if the entry is too big
 // for whatever reason.
 union LogEntry {
+  // The full state of the scheduler at some point-in-time. Transactions appearing before this
+  // entry in the log can be ignored.
   1: Snapshot snapshot
+
+  // An incremental update to apply to the scheduler storage.
   2: Transaction transaction
 
   // The value should be ignored - both true and false signal an equivalent no operation marker.
   3: bool noop;
 
+  // A frame that can be reassembled with others to form a complete LogEntry.
   4: Frame frame
 
   // A LogEntry that is first serialized in the thrift binary format,
@@ -215,5 +242,9 @@ union LogEntry {
   // Deflated entries are expected to be un-framed.  They may be pieced together by multiple frames,
   // but the contents of the deflated entry should not be a Frame.
   5: binary deflatedEntry
+
+  // The full state of the scheduler at some point-in-time, in a compact layout. Transactions
+  // appearing before this entry in the log can be ignored.
+  6: DeduplicatedSnapshot deduplicatedSnapshot
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/253cb737/src/test/java/org/apache/aurora/scheduler/storage/log/LogManagerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/LogManagerTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/LogManagerTest.java
index 39729b3..d4aaa67 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/log/LogManagerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/log/LogManagerTest.java
@@ -64,6 +64,7 @@ import org.easymock.IArgumentMatcher;
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.apache.aurora.scheduler.storage.log.SnapshotDeduplicator.SnapshotDeduplicatorImpl;
 import static org.easymock.EasyMock.expect;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
@@ -107,7 +108,9 @@ public class LogManagerTest extends EasyMockTest {
         stream,
         new EntrySerializer.EntrySerializerImpl(maxEntrySize, Hashing.md5()),
         false,
-        Hashing.md5());
+        Hashing.md5(),
+        new SnapshotDeduplicatorImpl(),
+        false);
   }
 
   @Test
@@ -400,7 +403,9 @@ public class LogManagerTest extends EasyMockTest {
         mockStream,
         new EntrySerializer.EntrySerializerImpl(message1.chunkSize, Hashing.md5()),
         false,
-        Hashing.md5());
+        Hashing.md5(),
+        new SnapshotDeduplicatorImpl(),
+        false);
     StreamTransaction tr1 = streamManager.startTransaction();
     tr1.add(op1);
 
@@ -509,7 +514,9 @@ public class LogManagerTest extends EasyMockTest {
         stream,
         new EntrySerializer.EntrySerializerImpl(NO_FRAMES_EVER_SIZE, md5),
         true,
-        md5);
+        md5,
+        new SnapshotDeduplicatorImpl(),
+        false);
     streamManager.snapshot(snapshot);
     streamManager.readFromBeginning(reader);
   }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/253cb737/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
index 7a8c3b8..2d44bd5 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
@@ -129,10 +129,12 @@ public class LogStorageTest extends EasyMockTest {
   private SchedulingService schedulingService;
   private SnapshotStore<Snapshot> snapshotStore;
   private StorageTestUtil storageUtil;
+  private SnapshotDeduplicator snapshotDeduplicator;
 
   @Before
   public void setUp() {
     log = createMock(Log.class);
+    snapshotDeduplicator = createMock(SnapshotDeduplicator.class);
 
     StreamManagerFactory streamManagerFactory = new StreamManagerFactory() {
       @Override
@@ -144,7 +146,9 @@ public class LogStorageTest extends EasyMockTest {
                 Amount.of(1, Data.GB),
                 md5),
             false,
-            md5);
+            md5,
+            snapshotDeduplicator,
+            false);
       }
     };
     LogManager logManager = new LogManager(log, streamManagerFactory);

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/253cb737/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotDeduplicatorImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotDeduplicatorImplTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotDeduplicatorImplTest.java
new file mode 100644
index 0000000..5546cf6
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotDeduplicatorImplTest.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.log;
+
+import java.util.Map;
+import java.util.Map.Entry;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.ExecutorConfig;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.gen.storage.DeduplicatedScheduledTask;
+import org.apache.aurora.gen.storage.DeduplicatedSnapshot;
+import org.apache.aurora.gen.storage.SchedulerMetadata;
+import org.apache.aurora.gen.storage.Snapshot;
+import org.apache.aurora.scheduler.storage.log.SnapshotDeduplicator.SnapshotDeduplicatorImpl;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class SnapshotDeduplicatorImplTest {
+  private final SnapshotDeduplicator snapshotDeduplicator = new SnapshotDeduplicatorImpl();
+
+  private final Map<String, TaskConfig> taskIdToConfig = ImmutableMap.of(
+      "task1", makeConfig("a"),
+      "task2", makeConfig("a"),
+      "task3", makeConfig("b"));
+
+  private TaskConfig makeConfig(String data) {
+    return new TaskConfig()
+        .setExecutorConfig(new ExecutorConfig()
+            .setData(data));
+  }
+
+  private ScheduledTask makeTask(String taskId, TaskConfig config) {
+    return new ScheduledTask()
+        .setAssignedTask(new AssignedTask()
+            .setTaskId(taskId)
+            .setTask(config));
+  }
+
+  private Snapshot makeSnapshot() {
+    Snapshot snapshot = new Snapshot()
+        .setSchedulerMetadata(new SchedulerMetadata()
+            .setFrameworkId("test"));
+
+    for (Entry<String, TaskConfig> entry : taskIdToConfig.entrySet()) {
+      snapshot.addToTasks(makeTask(entry.getKey(), entry.getValue()));
+    }
+
+    return snapshot;
+  }
+
+  @Test
+  public void testRoundTrip() throws Exception {
+    Snapshot snapshot = makeSnapshot();
+
+    assertEquals(
+        snapshot,
+        snapshotDeduplicator.reduplicate(snapshotDeduplicator.deduplicate(snapshot)));
+  }
+
+  @Test
+  public void testDeduplicatedFormat() {
+    DeduplicatedSnapshot deduplicatedSnapshot = snapshotDeduplicator.deduplicate(makeSnapshot());
+
+    assertEquals(
+        "The tasks field of the partial snapshot should be empty.",
+        0,
+        deduplicatedSnapshot.getPartialSnapshot().getTasksSize());
+
+    assertEquals(
+        "The total number of task configs should be equal to the number of unique task configs.",
+        2,
+        deduplicatedSnapshot.getTaskConfigsSize());
+
+    assertEquals(
+        ImmutableSet.of(makeConfig("a"), makeConfig("b")),
+        ImmutableSet.copyOf(deduplicatedSnapshot.getTaskConfigs()));
+
+    for (DeduplicatedScheduledTask task : deduplicatedSnapshot.getPartialTasks()) {
+      assertEquals(
+          "The deduplicated task should have the correct index into the taskConfigs table.",
+          taskIdToConfig.get(task.getPartialScheduledTask().getAssignedTask().getTaskId()),
+          deduplicatedSnapshot.getTaskConfigs().get(task.getTaskConfigId()));
+
+      assertNull(
+          "The task config field of partial scheduled tasks should be null.",
+          task.getPartialScheduledTask().getAssignedTask().getTask());
+    }
+  }
+
+  @Test(expected = CodingException.class)
+  public void testReduplicateFailure() throws Exception {
+    DeduplicatedSnapshot corrupt = new DeduplicatedSnapshot()
+        .setPartialSnapshot(new Snapshot().setSchedulerMetadata(new SchedulerMetadata()))
+        .setPartialTasks(ImmutableList.of(
+            new DeduplicatedScheduledTask()
+                .setPartialScheduledTask(new ScheduledTask())
+                .setTaskConfigId(1)))
+        .setTaskConfigs(ImmutableList.of(new TaskConfig()));
+
+    snapshotDeduplicator.reduplicate(corrupt);
+  }
+
+  @Test
+  public void testEmptyRoundTrip() throws Exception {
+    Snapshot snapshot = new Snapshot();
+
+    assertEquals(
+        snapshot,
+        snapshotDeduplicator.reduplicate(snapshotDeduplicator.deduplicate(snapshot)));
+  }
+}