You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2017/07/04 08:39:33 UTC

flink git commit: [FLINK-6773] [checkpoint] Introduce compression (snappy) for keyed state in full checkpoints and savepoints

Repository: flink
Updated Branches:
  refs/heads/master d17a4b9d0 -> 5171513a3


[FLINK-6773] [checkpoint] Introduce compression (snappy) for keyed state in full checkpoints and savepoints


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5171513a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5171513a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5171513a

Branch: refs/heads/master
Commit: 5171513a3c48d9ba1bd642225ee35cd8c194cb99
Parents: d17a4b9
Author: Stefan Richter <s....@data-artisans.com>
Authored: Tue Jun 13 18:07:50 2017 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Tue Jul 4 10:17:26 2017 +0200

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         | 187 +++++++++++--------
 .../flink/api/common/ExecutionConfig.java       |  20 +-
 flink-runtime/pom.xml                           |   6 +
 .../state/AbstractKeyedStateBackend.java        |  31 ++-
 .../state/KeyedBackendSerializationProxy.java   |  30 ++-
 ...ckendStateMetaInfoSnapshotReaderWriters.java |   2 +
 .../state/SnappyStreamCompressionDecorator.java |  54 ++++++
 .../state/StatePartitionStreamProvider.java     |   6 +-
 .../state/StreamCompressionDecorator.java       |  73 ++++++++
 .../UncompressedStreamCompressionDecorator.java |  48 +++++
 .../state/heap/HeapKeyedStateBackend.java       |  50 +++--
 .../state/heap/StateTableByKeyGroupReaders.java |   1 +
 .../runtime/util/ForwardingInputStream.java     |  83 ++++++++
 .../runtime/util/ForwardingOutputStream.java    |  63 +++++++
 .../util/NonClosingInputStreamDecorator.java    |  40 ++++
 .../util/NonClosingOutpusStreamDecorator.java   |  41 ++++
 .../runtime/util/NonClosingStreamDecorator.java |  79 --------
 .../runtime/state/SerializationProxiesTest.java |   7 +-
 .../state/StateSnapshotCompressionTest.java     | 180 ++++++++++++++++++
 ...tractEventTimeWindowCheckpointingITCase.java |   2 +
 .../test/checkpointing/RescalingITCase.java     |  15 +-
 21 files changed, 828 insertions(+), 190 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 7cbfb15..291973c 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -64,10 +64,13 @@ import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
 import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
+import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
 import org.apache.flink.runtime.state.StateHandleID;
 import org.apache.flink.runtime.state.StateObject;
 import org.apache.flink.runtime.state.StateUtil;
+import org.apache.flink.runtime.state.StreamCompressionDecorator;
 import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
 import org.apache.flink.runtime.state.internal.InternalAggregatingState;
 import org.apache.flink.runtime.state.internal.InternalFoldingState;
 import org.apache.flink.runtime.state.internal.InternalListState;
@@ -97,7 +100,9 @@ import org.slf4j.LoggerFactory;
 import java.io.EOFException;
 import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.ObjectInputStream;
+import java.io.OutputStream;
 import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -106,6 +111,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.SortedMap;
@@ -603,7 +609,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			}
 
 			KeyedBackendSerializationProxy<K> serializationProxy =
-					new KeyedBackendSerializationProxy<>(stateBackend.getKeySerializer(), metaInfoSnapshots);
+				new KeyedBackendSerializationProxy<>(
+					stateBackend.getKeySerializer(),
+					metaInfoSnapshots,
+					!Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, stateBackend.keyGroupCompressionDecorator));
 
 			serializationProxy.write(outputView);
 		}
@@ -612,71 +621,88 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 			byte[] previousKey = null;
 			byte[] previousValue = null;
+			OutputStream kgOutStream = null;
+			DataOutputView kgOutView = null;
 
-			// Here we transfer ownership of RocksIterators to the RocksDBMergeIterator
-			try (RocksDBMergeIterator mergeIterator = new RocksDBMergeIterator(
+			try {
+				// Here we transfer ownership of RocksIterators to the RocksDBMergeIterator
+				try (RocksDBMergeIterator mergeIterator = new RocksDBMergeIterator(
 					kvStateIterators, stateBackend.keyGroupPrefixBytes)) {
 
-				// handover complete, null out to prevent double close
-				kvStateIterators = null;
+					// handover complete, null out to prevent double close
+					kvStateIterators = null;
 
-				//preamble: setup with first key-group as our lookahead
-				if (mergeIterator.isValid()) {
-					//begin first key-group by recording the offset
-					keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), outStream.getPos());
-					//write the k/v-state id as metadata
-					//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
-					outputView.writeShort(mergeIterator.kvStateId());
-					previousKey = mergeIterator.key();
-					previousValue = mergeIterator.value();
-					mergeIterator.next();
-				}
+					//preamble: setup with first key-group as our lookahead
+					if (mergeIterator.isValid()) {
+						//begin first key-group by recording the offset
+						keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), outStream.getPos());
+						//write the k/v-state id as metadata
+						kgOutStream = stateBackend.keyGroupCompressionDecorator.decorateWithCompression(outStream);
+						kgOutView = new DataOutputViewStreamWrapper(kgOutStream);
+						//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
+						kgOutView.writeShort(mergeIterator.kvStateId());
+						previousKey = mergeIterator.key();
+						previousValue = mergeIterator.value();
+						mergeIterator.next();
+					}
 
-				//main loop: write k/v pairs ordered by (key-group, kv-state), thereby tracking key-group offsets.
-				while (mergeIterator.isValid()) {
+					//main loop: write k/v pairs ordered by (key-group, kv-state), thereby tracking key-group offsets.
+					while (mergeIterator.isValid()) {
 
-					assert (!hasMetaDataFollowsFlag(previousKey));
+						assert (!hasMetaDataFollowsFlag(previousKey));
 
-					//set signal in first key byte that meta data will follow in the stream after this k/v pair
-					if (mergeIterator.isNewKeyGroup() || mergeIterator.isNewKeyValueState()) {
+						//set signal in first key byte that meta data will follow in the stream after this k/v pair
+						if (mergeIterator.isNewKeyGroup() || mergeIterator.isNewKeyValueState()) {
 
-						//be cooperative and check for interruption from time to time in the hot loop
-						checkInterrupted();
+							//be cooperative and check for interruption from time to time in the hot loop
+							checkInterrupted();
 
-						setMetaDataFollowsFlagInKey(previousKey);
-					}
+							setMetaDataFollowsFlagInKey(previousKey);
+						}
 
-					writeKeyValuePair(previousKey, previousValue);
+						writeKeyValuePair(previousKey, previousValue, kgOutView);
 
-					//write meta data if we have to
-					if (mergeIterator.isNewKeyGroup()) {
-						//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
-						outputView.writeShort(END_OF_KEY_GROUP_MARK);
-						//begin new key-group
-						keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), outStream.getPos());
-						//write the kev-state
-						//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
-						outputView.writeShort(mergeIterator.kvStateId());
-					} else if (mergeIterator.isNewKeyValueState()) {
-						//write the k/v-state
-						//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
-						outputView.writeShort(mergeIterator.kvStateId());
+						//write meta data if we have to
+						if (mergeIterator.isNewKeyGroup()) {
+							//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
+							kgOutView.writeShort(END_OF_KEY_GROUP_MARK);
+							// this will just close the outer stream
+							kgOutStream.close();
+							//begin new key-group
+							keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), outStream.getPos());
+							//write the kev-state
+							//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
+							kgOutStream = stateBackend.keyGroupCompressionDecorator.decorateWithCompression(outStream);
+							kgOutView = new DataOutputViewStreamWrapper(kgOutStream);
+							kgOutView.writeShort(mergeIterator.kvStateId());
+						} else if (mergeIterator.isNewKeyValueState()) {
+							//write the k/v-state
+							//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
+							kgOutView.writeShort(mergeIterator.kvStateId());
+						}
+
+						//request next k/v pair
+						previousKey = mergeIterator.key();
+						previousValue = mergeIterator.value();
+						mergeIterator.next();
 					}
+				}
 
-					//request next k/v pair
-					previousKey = mergeIterator.key();
-					previousValue = mergeIterator.value();
-					mergeIterator.next();
+				//epilogue: write last key-group
+				if (previousKey != null) {
+					assert (!hasMetaDataFollowsFlag(previousKey));
+					setMetaDataFollowsFlagInKey(previousKey);
+					writeKeyValuePair(previousKey, previousValue, kgOutView);
+					//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
+					kgOutView.writeShort(END_OF_KEY_GROUP_MARK);
+					// this will just close the outer stream
+					kgOutStream.close();
+					kgOutStream = null;
 				}
-			}
 
-			//epilogue: write last key-group
-			if (previousKey != null) {
-				assert (!hasMetaDataFollowsFlag(previousKey));
-				setMetaDataFollowsFlagInKey(previousKey);
-				writeKeyValuePair(previousKey, previousValue);
-				//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
-				outputView.writeShort(END_OF_KEY_GROUP_MARK);
+			} finally {
+				// this will just close the outer stream
+				IOUtils.closeQuietly(kgOutStream);
 			}
 		}
 
@@ -687,9 +713,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			return stateHandle != null ? new KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle) : null;
 		}
 
-		private void writeKeyValuePair(byte[] key, byte[] value) throws IOException {
-			BytePrimitiveArraySerializer.INSTANCE.serialize(key, outputView);
-			BytePrimitiveArraySerializer.INSTANCE.serialize(value, outputView);
+		private void writeKeyValuePair(byte[] key, byte[] value, DataOutputView out) throws IOException {
+			BytePrimitiveArraySerializer.INSTANCE.serialize(key, out);
+			BytePrimitiveArraySerializer.INSTANCE.serialize(value, out);
 		}
 
 		static void setMetaDataFollowsFlagInKey(byte[] key) {
@@ -808,8 +834,13 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 					.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
 				closeableRegistry.registerClosable(outputStream);
 
+				//no need for compression scheme support because sst-files are already compressed
 				KeyedBackendSerializationProxy<K> serializationProxy =
-					new KeyedBackendSerializationProxy<>(stateBackend.keySerializer, stateMetaInfoSnapshots);
+					new KeyedBackendSerializationProxy<>(
+						stateBackend.keySerializer,
+						stateMetaInfoSnapshots,
+						false);
+
 				DataOutputView out = new DataOutputViewStreamWrapper(outputStream);
 
 				serializationProxy.write(out);
@@ -1044,6 +1075,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		private DataInputView currentStateHandleInView;
 		/** Current list of ColumnFamilyHandles for all column families we restore from currentKeyGroupsStateHandle. */
 		private List<ColumnFamilyHandle> currentStateHandleKVStateColumnFamilies;
+		/** The compression decorator that was used for writing the state, as determined by the meta data. */
+		private StreamCompressionDecorator keygroupStreamCompressionDecorator;
 
 		/**
 		 * Creates a restore operation object for the given state backend instance.
@@ -1132,6 +1165,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 					"Aborting now since state migration is currently not available");
 			}
 
+			this.keygroupStreamCompressionDecorator = serializationProxy.isUsingKeyGroupCompression() ?
+				SnappyStreamCompressionDecorator.INSTANCE : UncompressedStreamCompressionDecorator.INSTANCE;
+
 			List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredMetaInfos =
 					serializationProxy.getStateMetaInfoSnapshots();
 			currentStateHandleKVStateColumnFamilies = new ArrayList<>(restoredMetaInfos.size());
@@ -1188,27 +1224,30 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				if (0L != offset) {
 					currentStateHandleInStream.seek(offset);
 					boolean keyGroupHasMoreKeys = true;
-					//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
-					int kvStateId = currentStateHandleInView.readShort();
-					ColumnFamilyHandle handle = currentStateHandleKVStateColumnFamilies.get(kvStateId);
-					//insert all k/v pairs into DB
-					while (keyGroupHasMoreKeys) {
-						byte[] key = BytePrimitiveArraySerializer.INSTANCE.deserialize(currentStateHandleInView);
-						byte[] value = BytePrimitiveArraySerializer.INSTANCE.deserialize(currentStateHandleInView);
-						if (RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(key)) {
-							//clear the signal bit in the key to make it ready for insertion again
-							RocksDBFullSnapshotOperation.clearMetaDataFollowsFlag(key);
-							rocksDBKeyedStateBackend.db.put(handle, key, value);
-							//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
-							kvStateId = RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK
-									& currentStateHandleInView.readShort();
-							if (RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK == kvStateId) {
-								keyGroupHasMoreKeys = false;
+					try (InputStream compressedKgIn = keygroupStreamCompressionDecorator.decorateWithCompression(currentStateHandleInStream)) {
+						DataInputViewStreamWrapper compressedKgInputView = new DataInputViewStreamWrapper(compressedKgIn);
+						//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
+						int kvStateId = compressedKgInputView.readShort();
+						ColumnFamilyHandle handle = currentStateHandleKVStateColumnFamilies.get(kvStateId);
+						//insert all k/v pairs into DB
+						while (keyGroupHasMoreKeys) {
+							byte[] key = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
+							byte[] value = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
+							if (RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(key)) {
+								//clear the signal bit in the key to make it ready for insertion again
+								RocksDBFullSnapshotOperation.clearMetaDataFollowsFlag(key);
+								rocksDBKeyedStateBackend.db.put(handle, key, value);
+								//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
+								kvStateId = RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK
+									& compressedKgInputView.readShort();
+								if (RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK == kvStateId) {
+									keyGroupHasMoreKeys = false;
+								} else {
+									handle = currentStateHandleKVStateColumnFamilies.get(kvStateId);
+								}
 							} else {
-								handle = currentStateHandleKVStateColumnFamilies.get(kvStateId);
+								rocksDBKeyedStateBackend.db.put(handle, key, value);
 							}
-						} else {
-							rocksDBKeyedStateBackend.db.put(handle, key, value);
 						}
 					}
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 26e6af1..fc66ccd 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -18,13 +18,14 @@
 
 package org.apache.flink.api.common;
 
-import com.esotericsoftware.kryo.Serializer;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.configuration.TaskManagerOptions;
 
+import com.esotericsoftware.kryo.Serializer;
+
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.LinkedHashMap;
@@ -146,6 +147,9 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
 	 */
 	private long taskCancellationTimeoutMillis = -1;
 
+	/** This flag defines if we use compression for the state snapshot data or not. Default: false */
+	private boolean useSnapshotCompression = false;
+
 	// ------------------------------- User code values --------------------------------------------
 
 	private GlobalJobParameters globalJobParameters;
@@ -840,6 +844,14 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
 		this.autoTypeRegistrationEnabled = false;
 	}
 
+	public boolean isUseSnapshotCompression() {
+		return useSnapshotCompression;
+	}
+
+	public void setUseSnapshotCompression(boolean useSnapshotCompression) {
+		this.useSnapshotCompression = useSnapshotCompression;
+	}
+
 	@Override
 	public boolean equals(Object obj) {
 		if (obj instanceof ExecutionConfig) {
@@ -864,7 +876,8 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
 				defaultKryoSerializerClasses.equals(other.defaultKryoSerializerClasses) &&
 				registeredKryoTypes.equals(other.registeredKryoTypes) &&
 				registeredPojoTypes.equals(other.registeredPojoTypes) &&
-				taskCancellationIntervalMillis == other.taskCancellationIntervalMillis;
+				taskCancellationIntervalMillis == other.taskCancellationIntervalMillis &&
+				useSnapshotCompression == other.useSnapshotCompression;
 
 		} else {
 			return false;
@@ -891,7 +904,8 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
 			defaultKryoSerializerClasses,
 			registeredKryoTypes,
 			registeredPojoTypes,
-			taskCancellationIntervalMillis);
+			taskCancellationIntervalMillis,
+			useSnapshotCompression);
 	}
 
 	public boolean canEqual(Object obj) {

http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 602f788..654227a 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -144,6 +144,12 @@ under the License.
 			<artifactId>zookeeper</artifactId>
 		</dependency>
 
+		<dependency>
+			<groupId>org.xerial.snappy</groupId>
+			<artifactId>snappy-java</artifactId>
+			<version>1.1.4</version>
+		</dependency>
+
 		<!--
 		The KryoSerializer dynamically loads Kryo instances via Chill and requires that Chill
 		is in the classpath. Because we do not want to have transitive Scala dependencies

http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index 2b225df..30ca22e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -98,13 +98,18 @@ public abstract class AbstractKeyedStateBackend<K>
 
 	private final ExecutionConfig executionConfig;
 
+	/**
+	 * Decoratores the input and output streams to write key-groups compressed.
+	 */
+	protected final StreamCompressionDecorator keyGroupCompressionDecorator;
+
 	public AbstractKeyedStateBackend(
-			TaskKvStateRegistry kvStateRegistry,
-			TypeSerializer<K> keySerializer,
-			ClassLoader userCodeClassLoader,
-			int numberOfKeyGroups,
-			KeyGroupRange keyGroupRange,
-			ExecutionConfig executionConfig) {
+		TaskKvStateRegistry kvStateRegistry,
+		TypeSerializer<K> keySerializer,
+		ClassLoader userCodeClassLoader,
+		int numberOfKeyGroups,
+		KeyGroupRange keyGroupRange,
+		ExecutionConfig executionConfig) {
 
 		this.kvStateRegistry = kvStateRegistry;//Preconditions.checkNotNull(kvStateRegistry);
 		this.keySerializer = Preconditions.checkNotNull(keySerializer);
@@ -114,6 +119,15 @@ public abstract class AbstractKeyedStateBackend<K>
 		this.cancelStreamRegistry = new CloseableRegistry();
 		this.keyValueStatesByName = new HashMap<>();
 		this.executionConfig = executionConfig;
+		this.keyGroupCompressionDecorator = determineStreamCompression(executionConfig);
+	}
+
+	private StreamCompressionDecorator determineStreamCompression(ExecutionConfig executionConfig) {
+		if (executionConfig != null && executionConfig.isUseSnapshotCompression()) {
+			return SnappyStreamCompressionDecorator.INSTANCE;
+		} else {
+			return UncompressedStreamCompressionDecorator.INSTANCE;
+		}
 	}
 
 	/**
@@ -394,4 +408,9 @@ public abstract class AbstractKeyedStateBackend<K>
 	public boolean supportsAsynchronousSnapshots() {
 		return false;
 	}
+
+	@VisibleForTesting
+	public StreamCompressionDecorator getKeyGroupCompressionDecorator() {
+		return keyGroupCompressionDecorator;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
index 2ff8cb6..30b7344 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
@@ -38,7 +38,11 @@ import java.util.List;
  */
 public class KeyedBackendSerializationProxy<K> extends VersionedIOReadableWritable {
 
-	public static final int VERSION = 3;
+	public static final int VERSION = 4;
+
+	//TODO allow for more (user defined) compression formats + backwards compatibility story.
+	/** This specifies if we use a compressed format write the key-groups */
+	private boolean usingKeyGroupCompression;
 
 	private TypeSerializer<K> keySerializer;
 	private TypeSerializerConfigSnapshot keySerializerConfigSnapshot;
@@ -53,7 +57,10 @@ public class KeyedBackendSerializationProxy<K> extends VersionedIOReadableWritab
 
 	public KeyedBackendSerializationProxy(
 			TypeSerializer<K> keySerializer,
-			List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots) {
+			List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots,
+			boolean compression) {
+
+		this.usingKeyGroupCompression = compression;
 
 		this.keySerializer = Preconditions.checkNotNull(keySerializer);
 		this.keySerializerConfigSnapshot = Preconditions.checkNotNull(keySerializer.snapshotConfiguration());
@@ -75,6 +82,10 @@ public class KeyedBackendSerializationProxy<K> extends VersionedIOReadableWritab
 		return keySerializerConfigSnapshot;
 	}
 
+	public boolean isUsingKeyGroupCompression() {
+		return usingKeyGroupCompression;
+	}
+
 	@Override
 	public int getVersion() {
 		return VERSION;
@@ -83,13 +94,16 @@ public class KeyedBackendSerializationProxy<K> extends VersionedIOReadableWritab
 	@Override
 	public int[] getCompatibleVersions() {
 		// we are compatible with version 3 (Flink 1.3.x) and version 1 & 2 (Flink 1.2.x)
-		return new int[] {VERSION, 2, 1};
+		return new int[] {VERSION, 3, 2, 1};
 	}
 
 	@Override
 	public void write(DataOutputView out) throws IOException {
 		super.write(out);
 
+		// write the compression format used to write each key-group
+		out.writeBoolean(usingKeyGroupCompression);
+
 		// write in a way to be fault tolerant of read failures when deserializing the key serializer
 		TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(
 				out,
@@ -110,8 +124,16 @@ public class KeyedBackendSerializationProxy<K> extends VersionedIOReadableWritab
 	public void read(DataInputView in) throws IOException {
 		super.read(in);
 
+		final int readVersion = getReadVersion();
+
+		if (readVersion >= 4) {
+			usingKeyGroupCompression = in.readBoolean();
+		} else {
+			usingKeyGroupCompression = false;
+		}
+
 		// only starting from version 3, we have the key serializer and its config snapshot written
-		if (getReadVersion() >= 3) {
+		if (readVersion >= 3) {
 			Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> keySerializerAndConfig =
 					TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(in, userCodeClassLoader).get(0);
 			this.keySerializer = (TypeSerializer<K>) keySerializerAndConfig.f0;

http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java
index 9108306..d4244e0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java
@@ -51,6 +51,7 @@ public class KeyedBackendStateMetaInfoSnapshotReaderWriters {
 			case 2:
 				return new KeyedBackendStateMetaInfoWriterV1V2<>(stateMetaInfo);
 
+			case 3:
 			// current version
 			case KeyedBackendSerializationProxy.VERSION:
 				return new KeyedBackendStateMetaInfoWriterV3<>(stateMetaInfo);
@@ -130,6 +131,7 @@ public class KeyedBackendStateMetaInfoSnapshotReaderWriters {
 				return new KeyedBackendStateMetaInfoReaderV1V2<>(userCodeClassLoader);
 
 			// current version
+			case 3:
 			case KeyedBackendSerializationProxy.VERSION:
 				return new KeyedBackendStateMetaInfoReaderV3<>(userCodeClassLoader);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnappyStreamCompressionDecorator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnappyStreamCompressionDecorator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnappyStreamCompressionDecorator.java
new file mode 100644
index 0000000..194cb2c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnappyStreamCompressionDecorator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.util.NonClosingInputStreamDecorator;
+import org.apache.flink.runtime.util.NonClosingOutpusStreamDecorator;
+
+import org.xerial.snappy.SnappyFramedInputStream;
+import org.xerial.snappy.SnappyFramedOutputStream;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * This implementation decorates the stream with snappy compression.
+ */
+@Internal
+public class SnappyStreamCompressionDecorator extends StreamCompressionDecorator {
+
+	public static final StreamCompressionDecorator INSTANCE = new SnappyStreamCompressionDecorator();
+
+	private static final long serialVersionUID = 1L;
+
+	private static final int COMPRESSION_BLOCK_SIZE = 64 * 1024;
+	private static final double MIN_COMPRESSION_RATIO = 0.85d;
+
+	@Override
+	protected OutputStream decorateWithCompression(NonClosingOutpusStreamDecorator stream) throws IOException {
+		return new SnappyFramedOutputStream(stream, COMPRESSION_BLOCK_SIZE, MIN_COMPRESSION_RATIO);
+	}
+
+	@Override
+	protected InputStream decorateWithCompression(NonClosingInputStreamDecorator stream) throws IOException {
+		return new SnappyFramedInputStream(stream, false);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StatePartitionStreamProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StatePartitionStreamProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StatePartitionStreamProvider.java
index 8b07da8..50e1b3c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StatePartitionStreamProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StatePartitionStreamProvider.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.runtime.util.NonClosingStreamDecorator;
+import org.apache.flink.runtime.util.NonClosingInputStreamDecorator;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
@@ -45,7 +45,7 @@ public class StatePartitionStreamProvider {
 	}
 
 	public StatePartitionStreamProvider(InputStream stream) {
-		this.stream = new NonClosingStreamDecorator(Preconditions.checkNotNull(stream));
+		this.stream = new NonClosingInputStreamDecorator(Preconditions.checkNotNull(stream));
 		this.creationException = null;
 	}
 
@@ -59,4 +59,4 @@ public class StatePartitionStreamProvider {
 		}
 		return stream;
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamCompressionDecorator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamCompressionDecorator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamCompressionDecorator.java
new file mode 100644
index 0000000..29bc519
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamCompressionDecorator.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.util.NonClosingInputStreamDecorator;
+import org.apache.flink.runtime.util.NonClosingOutpusStreamDecorator;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+/**
+ * Implementations of this interface decorate streams with a compression scheme. Subclasses should be stateless.
+ */
+@Internal
+public abstract class StreamCompressionDecorator implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * Decorates the stream by wrapping it into a stream that applies a compression.
+	 *
+	 * IMPORTANT: For streams returned by this method, {@link OutputStream#close()} is not propagated to the inner
+	 * stream. The inner stream must be closed separately.
+	 *
+	 * @param stream the stream to decorate.
+	 * @return an output stream that is decorated by the compression scheme.
+	 */
+	public final OutputStream decorateWithCompression(OutputStream stream) throws IOException {
+		return decorateWithCompression(new NonClosingOutpusStreamDecorator(stream));
+	}
+
+	/**
+	 * IMPORTANT: For streams returned by this method, {@link InputStream#close()} is not propagated to the inner
+	 * stream. The inner stream must be closed separately.
+	 *
+	 * @param stream the stream to decorate.
+	 * @return an input stream that is decorated by the compression scheme.
+	 */
+	public final InputStream decorateWithCompression(InputStream stream) throws IOException {
+		return decorateWithCompression(new NonClosingInputStreamDecorator(stream));
+	}
+
+	/**
+	 * @param stream the stream to decorate
+	 * @return an output stream that is decorated by the compression scheme.
+	 */
+	protected abstract OutputStream decorateWithCompression(NonClosingOutpusStreamDecorator stream) throws IOException;
+
+	/**
+	 * @param stream the stream to decorate.
+	 * @return an input stream that is decorated by the compression scheme.
+	 */
+	protected abstract InputStream decorateWithCompression(NonClosingInputStreamDecorator stream) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/main/java/org/apache/flink/runtime/state/UncompressedStreamCompressionDecorator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/UncompressedStreamCompressionDecorator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/UncompressedStreamCompressionDecorator.java
new file mode 100644
index 0000000..aa04020
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/UncompressedStreamCompressionDecorator.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.util.NonClosingInputStreamDecorator;
+import org.apache.flink.runtime.util.NonClosingOutpusStreamDecorator;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * This implementation does not decorate the stream with any compression.
+ */
+@Internal
+public class UncompressedStreamCompressionDecorator extends StreamCompressionDecorator {
+
+	public static final StreamCompressionDecorator INSTANCE = new UncompressedStreamCompressionDecorator();
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	protected OutputStream decorateWithCompression(NonClosingOutpusStreamDecorator stream) throws IOException {
+		return stream;
+	}
+
+	@Override
+	protected InputStream decorateWithCompression(NonClosingInputStreamDecorator stream) throws IOException {
+		return stream;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 35a70bb..055274f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.state.heap;
 
-import org.apache.commons.collections.map.HashedMap;
-import org.apache.commons.io.IOUtils;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.AggregatingStateDescriptor;
@@ -57,7 +55,10 @@ import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
+import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
+import org.apache.flink.runtime.state.StreamCompressionDecorator;
 import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
 import org.apache.flink.runtime.state.internal.InternalAggregatingState;
 import org.apache.flink.runtime.state.internal.InternalFoldingState;
 import org.apache.flink.runtime.state.internal.InternalListState;
@@ -67,15 +68,21 @@ import org.apache.flink.runtime.state.internal.InternalValueState;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StateMigrationException;
+
+import org.apache.commons.collections.map.HashedMap;
+import org.apache.commons.io.IOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.RunnableFuture;
 
 /**
@@ -316,7 +323,10 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		}
 
 		final KeyedBackendSerializationProxy<K> serializationProxy =
-				new KeyedBackendSerializationProxy<>(keySerializer, metaInfoSnapshots);
+			new KeyedBackendSerializationProxy<>(
+				keySerializer,
+				metaInfoSnapshots,
+				!Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, keyGroupCompressionDecorator));
 
 		//--------------------------------------------------- this becomes the end of sync part
 
@@ -331,6 +341,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				@Override
 				public KeyGroupsStateHandle performOperation() throws Exception {
 					long asyncStartTime = System.currentTimeMillis();
+
 					CheckpointStreamFactory.CheckpointStateOutputStream stream = getIoHandle();
 					DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(stream);
 					serializationProxy.write(outView);
@@ -343,8 +354,11 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 						outView.writeInt(keyGroupId);
 
 						for (Map.Entry<String, StateTable<K, ?, ?>> kvState : stateTables.entrySet()) {
-							outView.writeShort(kVStateToId.get(kvState.getKey()));
-							cowStateStableSnapshots.get(kvState.getValue()).writeMappingsInKeyGroup(outView, keyGroupId);
+							OutputStream kgCompressionOut = keyGroupCompressionDecorator.decorateWithCompression(stream);
+							DataOutputViewStreamWrapper kgCompressionView = new DataOutputViewStreamWrapper(kgCompressionOut);
+							kgCompressionView.writeShort(kVStateToId.get(kvState.getKey()));
+							cowStateStableSnapshots.get(kvState.getValue()).writeMappingsInKeyGroup(kgCompressionView, keyGroupId);
+							kgCompressionOut.close(); // this will just close the outer stream
 						}
 					}
 
@@ -492,6 +506,9 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 					}
 				}
 
+				final StreamCompressionDecorator streamCompressionDecorator = serializationProxy.isUsingKeyGroupCompression() ?
+					SnappyStreamCompressionDecorator.INSTANCE : UncompressedStreamCompressionDecorator.INSTANCE;
+
 				for (Tuple2<Integer, Long> groupOffset : keyGroupsStateHandle.getGroupRangeOffsets()) {
 					int keyGroupIndex = groupOffset.f0;
 					long offset = groupOffset.f1;
@@ -503,19 +520,26 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 					int writtenKeyGroupIndex = inView.readInt();
 
-					Preconditions.checkState(writtenKeyGroupIndex == keyGroupIndex,
+					try (InputStream kgCompressionInStream =
+							 streamCompressionDecorator.decorateWithCompression(fsDataInputStream)) {
+
+						DataInputViewStreamWrapper kgCompressionInView =
+							new DataInputViewStreamWrapper(kgCompressionInStream);
+
+						Preconditions.checkState(writtenKeyGroupIndex == keyGroupIndex,
 							"Unexpected key-group in restore.");
 
-					for (int i = 0; i < restoredMetaInfos.size(); i++) {
-						int kvStateId = inView.readShort();
-						StateTable<K, ?, ?> stateTable = stateTables.get(kvStatesById.get(kvStateId));
+						for (int i = 0; i < restoredMetaInfos.size(); i++) {
+							int kvStateId = kgCompressionInView.readShort();
+							StateTable<K, ?, ?> stateTable = stateTables.get(kvStatesById.get(kvStateId));
 
-						StateTableByKeyGroupReader keyGroupReader =
+							StateTableByKeyGroupReader keyGroupReader =
 								StateTableByKeyGroupReaders.readerForVersion(
-										stateTable,
-										serializationProxy.getReadVersion());
+									stateTable,
+									serializationProxy.getReadVersion());
 
-						keyGroupReader.readMappingsInKeyGroup(inView, keyGroupIndex);
+							keyGroupReader.readMappingsInKeyGroup(kgCompressionInView, keyGroupIndex);
+						}
 					}
 				}
 			} finally {

http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java
index d7bc94e..0b69a87 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java
@@ -48,6 +48,7 @@ class StateTableByKeyGroupReaders {
 				return new StateTableByKeyGroupReaderV1<>(table);
 			case 2:
 			case 3:
+			case 4:
 				return new StateTableByKeyGroupReaderV2V3<>(table);
 			default:
 				throw new IllegalArgumentException("Unknown version: " + version);

http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ForwardingInputStream.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ForwardingInputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ForwardingInputStream.java
new file mode 100644
index 0000000..a539b8b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ForwardingInputStream.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Input stream, that wraps another input stream and forwards all method calls to the wrapped stream.
+ */
+@Internal
+public class ForwardingInputStream extends InputStream {
+
+	private final InputStream delegate;
+
+	public ForwardingInputStream(InputStream delegate) {
+		this.delegate = Preconditions.checkNotNull(delegate);
+	}
+
+	@Override
+	public int read() throws IOException {
+		return delegate.read();
+	}
+
+	@Override
+	public int read(byte[] b) throws IOException {
+		return delegate.read(b);
+	}
+
+	@Override
+	public int read(byte[] b, int off, int len) throws IOException {
+		return delegate.read(b, off, len);
+	}
+
+	@Override
+	public long skip(long n) throws IOException {
+		return delegate.skip(n);
+	}
+
+	@Override
+	public int available() throws IOException {
+		return delegate.available();
+	}
+
+	@Override
+	public void close() throws IOException {
+		delegate.close();
+	}
+
+	@Override
+	public void mark(int readlimit) {
+		delegate.mark(readlimit);
+	}
+
+	@Override
+	public void reset() throws IOException {
+		delegate.reset();
+	}
+
+	@Override
+	public boolean markSupported() {
+		return delegate.markSupported();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ForwardingOutputStream.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ForwardingOutputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ForwardingOutputStream.java
new file mode 100644
index 0000000..6026373
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ForwardingOutputStream.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * Output stream, that wraps another input stream and forwards all method calls to the wrapped stream.
+ */
+@Internal
+public class ForwardingOutputStream extends OutputStream {
+
+	private final OutputStream delegate;
+
+	public ForwardingOutputStream(OutputStream delegate) {
+		this.delegate = Preconditions.checkNotNull(delegate);
+	}
+
+	@Override
+	public void write(int b) throws IOException {
+		delegate.write(b);
+	}
+
+	@Override
+	public void write(byte[] b) throws IOException {
+		delegate.write(b);
+	}
+
+	@Override
+	public void write(byte[] b, int off, int len) throws IOException {
+		delegate.write(b, off, len);
+	}
+
+	@Override
+	public void flush() throws IOException {
+		delegate.flush();
+	}
+
+	@Override
+	public void close() throws IOException {
+		delegate.close();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonClosingInputStreamDecorator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonClosingInputStreamDecorator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonClosingInputStreamDecorator.java
new file mode 100644
index 0000000..52738dc
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonClosingInputStreamDecorator.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.annotation.Internal;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Decorator for input streams that ignores calls to {@link InputStream#close()}.
+ */
+@Internal
+public class NonClosingInputStreamDecorator extends ForwardingInputStream {
+
+	public NonClosingInputStreamDecorator(InputStream delegate) {
+		super(delegate);
+	}
+
+	@Override
+	public void close() throws IOException {
+		// ignore
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonClosingOutpusStreamDecorator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonClosingOutpusStreamDecorator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonClosingOutpusStreamDecorator.java
new file mode 100644
index 0000000..e8f6183
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonClosingOutpusStreamDecorator.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.annotation.Internal;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * Decorator for input streams that ignores calls to {@link OutputStream#close()}.
+ */
+@Internal
+public class NonClosingOutpusStreamDecorator extends ForwardingOutputStream {
+
+
+	public NonClosingOutpusStreamDecorator(OutputStream delegate) {
+		super(delegate);
+	}
+
+	@Override
+	public void close() throws IOException {
+		// ignore
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonClosingStreamDecorator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonClosingStreamDecorator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonClosingStreamDecorator.java
deleted file mode 100644
index ba7bc79..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonClosingStreamDecorator.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-/**
- * Decorator for input streams that ignores calls to {@link InputStream#close()}.
- */
-public class NonClosingStreamDecorator extends InputStream {
-
-	private final InputStream delegate;
-
-	public NonClosingStreamDecorator(InputStream delegate) {
-		this.delegate = delegate;
-	}
-
-	@Override
-	public int read() throws IOException {
-		return delegate.read();
-	}
-
-	@Override
-	public int read(byte[] b) throws IOException {
-		return delegate.read(b);
-	}
-
-	@Override
-	public int read(byte[] b, int off, int len) throws IOException {
-		return delegate.read(b, off, len);
-	}
-
-	@Override
-	public long skip(long n) throws IOException {
-		return delegate.skip(n);
-	}
-
-	@Override
-	public int available() throws IOException {
-		return super.available();
-	}
-
-	@Override
-	public void close() throws IOException {
-		// ignore
-	}
-
-	@Override
-	public void mark(int readlimit) {
-		super.mark(readlimit);
-	}
-
-	@Override
-	public void reset() throws IOException {
-		super.reset();
-	}
-
-	@Override
-	public boolean markSupported() {
-		return super.markSupported();
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
index 920aa69..341d4fe 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
 import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -64,7 +65,7 @@ public class SerializationProxiesTest {
 			StateDescriptor.Type.VALUE, "c", namespaceSerializer, stateSerializer).snapshot());
 
 		KeyedBackendSerializationProxy<?> serializationProxy =
-				new KeyedBackendSerializationProxy<>(keySerializer, stateMetaInfoList);
+				new KeyedBackendSerializationProxy<>(keySerializer, stateMetaInfoList, true);
 
 		byte[] serialized;
 		try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
@@ -79,6 +80,7 @@ public class SerializationProxiesTest {
 			serializationProxy.read(new DataInputViewStreamWrapper(in));
 		}
 
+		Assert.assertEquals(true, serializationProxy.isUsingKeyGroupCompression());
 		Assert.assertEquals(keySerializer, serializationProxy.getKeySerializer());
 		Assert.assertEquals(keySerializer.snapshotConfiguration(), serializationProxy.getKeySerializerConfigSnapshot());
 		Assert.assertEquals(stateMetaInfoList, serializationProxy.getStateMetaInfoSnapshots());
@@ -101,7 +103,7 @@ public class SerializationProxiesTest {
 			StateDescriptor.Type.VALUE, "c", namespaceSerializer, stateSerializer).snapshot());
 
 		KeyedBackendSerializationProxy<?> serializationProxy =
-			new KeyedBackendSerializationProxy<>(keySerializer, stateMetaInfoList);
+			new KeyedBackendSerializationProxy<>(keySerializer, stateMetaInfoList, true);
 
 		byte[] serialized;
 		try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
@@ -122,6 +124,7 @@ public class SerializationProxiesTest {
 			serializationProxy.read(new DataInputViewStreamWrapper(in));
 		}
 
+		Assert.assertEquals(true, serializationProxy.isUsingKeyGroupCompression());
 		Assert.assertEquals(null, serializationProxy.getKeySerializer());
 		Assert.assertEquals(keySerializer.snapshotConfiguration(), serializationProxy.getKeySerializerConfigSnapshot());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
new file mode 100644
index 0000000..b932cb9
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.heap.HeapReducingStateTest;
+import org.apache.flink.runtime.state.internal.InternalValueState;
+import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
+
+import org.apache.commons.io.IOUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.concurrent.RunnableFuture;
+
+import static org.mockito.Mockito.mock;
+
+public class StateSnapshotCompressionTest {
+
+	@Test
+	public void testCompressionConfiguration() throws Exception {
+
+		ExecutionConfig executionConfig = new ExecutionConfig();
+		executionConfig.setUseSnapshotCompression(true);
+
+		AbstractKeyedStateBackend<String> stateBackend = new HeapKeyedStateBackend<>(
+			mock(TaskKvStateRegistry.class),
+			StringSerializer.INSTANCE,
+			HeapReducingStateTest.class.getClassLoader(),
+			16,
+			new KeyGroupRange(0, 15),
+			true,
+			executionConfig);
+
+		try {
+			Assert.assertTrue(SnappyStreamCompressionDecorator.INSTANCE.equals(stateBackend.getKeyGroupCompressionDecorator()));
+
+		} finally {
+			IOUtils.closeQuietly(stateBackend);
+			stateBackend.dispose();
+		}
+
+		executionConfig = new ExecutionConfig();
+		executionConfig.setUseSnapshotCompression(false);
+
+		stateBackend = new HeapKeyedStateBackend<>(
+			mock(TaskKvStateRegistry.class),
+			StringSerializer.INSTANCE,
+			HeapReducingStateTest.class.getClassLoader(),
+			16,
+			new KeyGroupRange(0, 15),
+			true,
+			executionConfig);
+
+		try {
+			Assert.assertTrue(UncompressedStreamCompressionDecorator.INSTANCE.equals(stateBackend.getKeyGroupCompressionDecorator()));
+
+		} finally {
+			IOUtils.closeQuietly(stateBackend);
+			stateBackend.dispose();
+		}
+	}
+
+	@Test
+	public void snapshotRestoreRoundtripWithCompression() throws Exception {
+		snapshotRestoreRoundtrip(true);
+	}
+
+	@Test
+	public void snapshotRestoreRoundtripUncompressed() throws Exception {
+		snapshotRestoreRoundtrip(false);
+	}
+
+	private void snapshotRestoreRoundtrip(boolean useCompression) throws Exception {
+
+		ExecutionConfig executionConfig = new ExecutionConfig();
+		executionConfig.setUseSnapshotCompression(useCompression);
+
+		KeyedStateHandle stateHandle = null;
+
+		ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("test", String.class);
+		stateDescriptor.initializeSerializerUnlessSet(executionConfig);
+
+		AbstractKeyedStateBackend<String> stateBackend = new HeapKeyedStateBackend<>(
+			mock(TaskKvStateRegistry.class),
+			StringSerializer.INSTANCE,
+			HeapReducingStateTest.class.getClassLoader(),
+			16,
+			new KeyGroupRange(0, 15),
+			true,
+			executionConfig);
+
+		try {
+
+			InternalValueState<VoidNamespace, String> state =
+				stateBackend.createValueState(
+					new VoidNamespaceSerializer(),
+					stateDescriptor);
+
+			stateBackend.setCurrentKey("A");
+			state.setCurrentNamespace(VoidNamespace.INSTANCE);
+			state.update("42");
+			stateBackend.setCurrentKey("B");
+			state.setCurrentNamespace(VoidNamespace.INSTANCE);
+			state.update("43");
+			stateBackend.setCurrentKey("C");
+			state.setCurrentNamespace(VoidNamespace.INSTANCE);
+			state.update("44");
+			stateBackend.setCurrentKey("D");
+			state.setCurrentNamespace(VoidNamespace.INSTANCE);
+			state.update("45");
+			CheckpointStreamFactory streamFactory = new MemCheckpointStreamFactory(4 * 1024 * 1024);
+			RunnableFuture<KeyedStateHandle> snapshot = stateBackend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forFullCheckpoint());
+			snapshot.run();
+			stateHandle = snapshot.get();
+
+		} finally {
+			IOUtils.closeQuietly(stateBackend);
+			stateBackend.dispose();
+		}
+
+		executionConfig = new ExecutionConfig();
+
+		stateBackend = new HeapKeyedStateBackend<>(
+			mock(TaskKvStateRegistry.class),
+			StringSerializer.INSTANCE,
+			HeapReducingStateTest.class.getClassLoader(),
+			16,
+			new KeyGroupRange(0, 15),
+			true,
+			executionConfig);
+		try {
+
+			stateBackend.restore(Collections.singletonList(stateHandle));
+
+			InternalValueState<VoidNamespace, String> state = stateBackend.createValueState(
+				new VoidNamespaceSerializer(),
+				stateDescriptor);
+
+			stateBackend.setCurrentKey("A");
+			state.setCurrentNamespace(VoidNamespace.INSTANCE);
+			Assert.assertEquals("42", state.value());
+			stateBackend.setCurrentKey("B");
+			state.setCurrentNamespace(VoidNamespace.INSTANCE);
+			Assert.assertEquals("43", state.value());
+			stateBackend.setCurrentKey("C");
+			state.setCurrentNamespace(VoidNamespace.INSTANCE);
+			Assert.assertEquals("44", state.value());
+			stateBackend.setCurrentKey("D");
+			state.setCurrentNamespace(VoidNamespace.INSTANCE);
+			Assert.assertEquals("45", state.value());
+
+		} finally {
+			IOUtils.closeQuietly(stateBackend);
+			stateBackend.dispose();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
index 6ad7708..0021b81 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
@@ -111,6 +111,7 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 		cluster.start();
 
 		env = new TestStreamEnvironment(cluster, PARALLELISM);
+		env.getConfig().setUseSnapshotCompression(true);
 	}
 
 	@AfterClass
@@ -318,6 +319,7 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
 			env.getConfig().disableSysoutLogging();
 			env.setStateBackend(this.stateBackend);
+			env.getConfig().setUseSnapshotCompression(true);
 
 			env
 					.addSource(new FailingSource(NUM_KEYS, NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3))

http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index 9df0d1a..a58ec51 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.test.checkpointing;
 
-import io.netty.util.internal.ConcurrentSet;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
@@ -55,6 +54,8 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
+
+import io.netty.util.internal.ConcurrentSet;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.ClassRule;
@@ -62,11 +63,6 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import scala.Option;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
 
 import java.io.File;
 import java.util.ArrayList;
@@ -78,6 +74,12 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -679,6 +681,7 @@ public class RescalingITCase extends TestLogger {
 		}
 		env.enableCheckpointing(checkpointingInterval);
 		env.setRestartStrategy(RestartStrategies.noRestart());
+		env.getConfig().setUseSnapshotCompression(true);
 
 		DataStream<Integer> input = env.addSource(new SubtaskIndexSource(
 				numberKeys,