You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2017/07/04 08:39:33 UTC
flink git commit: [FLINK-6773] [checkpoint] Introduce compression
(snappy) for keyed state in full checkpoints and savepoints
Repository: flink
Updated Branches:
refs/heads/master d17a4b9d0 -> 5171513a3
[FLINK-6773] [checkpoint] Introduce compression (snappy) for keyed state in full checkpoints and savepoints
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5171513a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5171513a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5171513a
Branch: refs/heads/master
Commit: 5171513a3c48d9ba1bd642225ee35cd8c194cb99
Parents: d17a4b9
Author: Stefan Richter <s....@data-artisans.com>
Authored: Tue Jun 13 18:07:50 2017 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Tue Jul 4 10:17:26 2017 +0200
----------------------------------------------------------------------
.../state/RocksDBKeyedStateBackend.java | 187 +++++++++++--------
.../flink/api/common/ExecutionConfig.java | 20 +-
flink-runtime/pom.xml | 6 +
.../state/AbstractKeyedStateBackend.java | 31 ++-
.../state/KeyedBackendSerializationProxy.java | 30 ++-
...ckendStateMetaInfoSnapshotReaderWriters.java | 2 +
.../state/SnappyStreamCompressionDecorator.java | 54 ++++++
.../state/StatePartitionStreamProvider.java | 6 +-
.../state/StreamCompressionDecorator.java | 73 ++++++++
.../UncompressedStreamCompressionDecorator.java | 48 +++++
.../state/heap/HeapKeyedStateBackend.java | 50 +++--
.../state/heap/StateTableByKeyGroupReaders.java | 1 +
.../runtime/util/ForwardingInputStream.java | 83 ++++++++
.../runtime/util/ForwardingOutputStream.java | 63 +++++++
.../util/NonClosingInputStreamDecorator.java | 40 ++++
.../util/NonClosingOutpusStreamDecorator.java | 41 ++++
.../runtime/util/NonClosingStreamDecorator.java | 79 --------
.../runtime/state/SerializationProxiesTest.java | 7 +-
.../state/StateSnapshotCompressionTest.java | 180 ++++++++++++++++++
...tractEventTimeWindowCheckpointingITCase.java | 2 +
.../test/checkpointing/RescalingITCase.java | 15 +-
21 files changed, 828 insertions(+), 190 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 7cbfb15..291973c 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -64,10 +64,13 @@ import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
+import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StateUtil;
+import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
import org.apache.flink.runtime.state.internal.InternalAggregatingState;
import org.apache.flink.runtime.state.internal.InternalFoldingState;
import org.apache.flink.runtime.state.internal.InternalListState;
@@ -97,7 +100,9 @@ import org.slf4j.LoggerFactory;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
+import java.io.InputStream;
import java.io.ObjectInputStream;
+import java.io.OutputStream;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collection;
@@ -106,6 +111,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.SortedMap;
@@ -603,7 +609,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
KeyedBackendSerializationProxy<K> serializationProxy =
- new KeyedBackendSerializationProxy<>(stateBackend.getKeySerializer(), metaInfoSnapshots);
+ new KeyedBackendSerializationProxy<>(
+ stateBackend.getKeySerializer(),
+ metaInfoSnapshots,
+ !Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, stateBackend.keyGroupCompressionDecorator));
serializationProxy.write(outputView);
}
@@ -612,71 +621,88 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
byte[] previousKey = null;
byte[] previousValue = null;
+ OutputStream kgOutStream = null;
+ DataOutputView kgOutView = null;
- // Here we transfer ownership of RocksIterators to the RocksDBMergeIterator
- try (RocksDBMergeIterator mergeIterator = new RocksDBMergeIterator(
+ try {
+ // Here we transfer ownership of RocksIterators to the RocksDBMergeIterator
+ try (RocksDBMergeIterator mergeIterator = new RocksDBMergeIterator(
kvStateIterators, stateBackend.keyGroupPrefixBytes)) {
- // handover complete, null out to prevent double close
- kvStateIterators = null;
+ // handover complete, null out to prevent double close
+ kvStateIterators = null;
- //preamble: setup with first key-group as our lookahead
- if (mergeIterator.isValid()) {
- //begin first key-group by recording the offset
- keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), outStream.getPos());
- //write the k/v-state id as metadata
- //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
- outputView.writeShort(mergeIterator.kvStateId());
- previousKey = mergeIterator.key();
- previousValue = mergeIterator.value();
- mergeIterator.next();
- }
+ //preamble: setup with first key-group as our lookahead
+ if (mergeIterator.isValid()) {
+ //begin first key-group by recording the offset
+ keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), outStream.getPos());
+ //write the k/v-state id as metadata
+ kgOutStream = stateBackend.keyGroupCompressionDecorator.decorateWithCompression(outStream);
+ kgOutView = new DataOutputViewStreamWrapper(kgOutStream);
+ //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
+ kgOutView.writeShort(mergeIterator.kvStateId());
+ previousKey = mergeIterator.key();
+ previousValue = mergeIterator.value();
+ mergeIterator.next();
+ }
- //main loop: write k/v pairs ordered by (key-group, kv-state), thereby tracking key-group offsets.
- while (mergeIterator.isValid()) {
+ //main loop: write k/v pairs ordered by (key-group, kv-state), thereby tracking key-group offsets.
+ while (mergeIterator.isValid()) {
- assert (!hasMetaDataFollowsFlag(previousKey));
+ assert (!hasMetaDataFollowsFlag(previousKey));
- //set signal in first key byte that meta data will follow in the stream after this k/v pair
- if (mergeIterator.isNewKeyGroup() || mergeIterator.isNewKeyValueState()) {
+ //set signal in first key byte that meta data will follow in the stream after this k/v pair
+ if (mergeIterator.isNewKeyGroup() || mergeIterator.isNewKeyValueState()) {
- //be cooperative and check for interruption from time to time in the hot loop
- checkInterrupted();
+ //be cooperative and check for interruption from time to time in the hot loop
+ checkInterrupted();
- setMetaDataFollowsFlagInKey(previousKey);
- }
+ setMetaDataFollowsFlagInKey(previousKey);
+ }
- writeKeyValuePair(previousKey, previousValue);
+ writeKeyValuePair(previousKey, previousValue, kgOutView);
- //write meta data if we have to
- if (mergeIterator.isNewKeyGroup()) {
- //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
- outputView.writeShort(END_OF_KEY_GROUP_MARK);
- //begin new key-group
- keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), outStream.getPos());
- //write the kev-state
- //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
- outputView.writeShort(mergeIterator.kvStateId());
- } else if (mergeIterator.isNewKeyValueState()) {
- //write the k/v-state
- //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
- outputView.writeShort(mergeIterator.kvStateId());
+ //write meta data if we have to
+ if (mergeIterator.isNewKeyGroup()) {
+ //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
+ kgOutView.writeShort(END_OF_KEY_GROUP_MARK);
+ // this will just close the outer stream
+ kgOutStream.close();
+ //begin new key-group
+ keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), outStream.getPos());
+ //write the kev-state
+ //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
+ kgOutStream = stateBackend.keyGroupCompressionDecorator.decorateWithCompression(outStream);
+ kgOutView = new DataOutputViewStreamWrapper(kgOutStream);
+ kgOutView.writeShort(mergeIterator.kvStateId());
+ } else if (mergeIterator.isNewKeyValueState()) {
+ //write the k/v-state
+ //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
+ kgOutView.writeShort(mergeIterator.kvStateId());
+ }
+
+ //request next k/v pair
+ previousKey = mergeIterator.key();
+ previousValue = mergeIterator.value();
+ mergeIterator.next();
}
+ }
- //request next k/v pair
- previousKey = mergeIterator.key();
- previousValue = mergeIterator.value();
- mergeIterator.next();
+ //epilogue: write last key-group
+ if (previousKey != null) {
+ assert (!hasMetaDataFollowsFlag(previousKey));
+ setMetaDataFollowsFlagInKey(previousKey);
+ writeKeyValuePair(previousKey, previousValue, kgOutView);
+ //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
+ kgOutView.writeShort(END_OF_KEY_GROUP_MARK);
+ // this will just close the outer stream
+ kgOutStream.close();
+ kgOutStream = null;
}
- }
- //epilogue: write last key-group
- if (previousKey != null) {
- assert (!hasMetaDataFollowsFlag(previousKey));
- setMetaDataFollowsFlagInKey(previousKey);
- writeKeyValuePair(previousKey, previousValue);
- //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
- outputView.writeShort(END_OF_KEY_GROUP_MARK);
+ } finally {
+ // this will just close the outer stream
+ IOUtils.closeQuietly(kgOutStream);
}
}
@@ -687,9 +713,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
return stateHandle != null ? new KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle) : null;
}
- private void writeKeyValuePair(byte[] key, byte[] value) throws IOException {
- BytePrimitiveArraySerializer.INSTANCE.serialize(key, outputView);
- BytePrimitiveArraySerializer.INSTANCE.serialize(value, outputView);
+ private void writeKeyValuePair(byte[] key, byte[] value, DataOutputView out) throws IOException {
+ BytePrimitiveArraySerializer.INSTANCE.serialize(key, out);
+ BytePrimitiveArraySerializer.INSTANCE.serialize(value, out);
}
static void setMetaDataFollowsFlagInKey(byte[] key) {
@@ -808,8 +834,13 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
closeableRegistry.registerClosable(outputStream);
+ //no need for compression scheme support because sst-files are already compressed
KeyedBackendSerializationProxy<K> serializationProxy =
- new KeyedBackendSerializationProxy<>(stateBackend.keySerializer, stateMetaInfoSnapshots);
+ new KeyedBackendSerializationProxy<>(
+ stateBackend.keySerializer,
+ stateMetaInfoSnapshots,
+ false);
+
DataOutputView out = new DataOutputViewStreamWrapper(outputStream);
serializationProxy.write(out);
@@ -1044,6 +1075,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
private DataInputView currentStateHandleInView;
/** Current list of ColumnFamilyHandles for all column families we restore from currentKeyGroupsStateHandle. */
private List<ColumnFamilyHandle> currentStateHandleKVStateColumnFamilies;
+ /** The compression decorator that was used for writing the state, as determined by the meta data. */
+ private StreamCompressionDecorator keygroupStreamCompressionDecorator;
/**
* Creates a restore operation object for the given state backend instance.
@@ -1132,6 +1165,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
"Aborting now since state migration is currently not available");
}
+ this.keygroupStreamCompressionDecorator = serializationProxy.isUsingKeyGroupCompression() ?
+ SnappyStreamCompressionDecorator.INSTANCE : UncompressedStreamCompressionDecorator.INSTANCE;
+
List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredMetaInfos =
serializationProxy.getStateMetaInfoSnapshots();
currentStateHandleKVStateColumnFamilies = new ArrayList<>(restoredMetaInfos.size());
@@ -1188,27 +1224,30 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
if (0L != offset) {
currentStateHandleInStream.seek(offset);
boolean keyGroupHasMoreKeys = true;
- //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
- int kvStateId = currentStateHandleInView.readShort();
- ColumnFamilyHandle handle = currentStateHandleKVStateColumnFamilies.get(kvStateId);
- //insert all k/v pairs into DB
- while (keyGroupHasMoreKeys) {
- byte[] key = BytePrimitiveArraySerializer.INSTANCE.deserialize(currentStateHandleInView);
- byte[] value = BytePrimitiveArraySerializer.INSTANCE.deserialize(currentStateHandleInView);
- if (RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(key)) {
- //clear the signal bit in the key to make it ready for insertion again
- RocksDBFullSnapshotOperation.clearMetaDataFollowsFlag(key);
- rocksDBKeyedStateBackend.db.put(handle, key, value);
- //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
- kvStateId = RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK
- & currentStateHandleInView.readShort();
- if (RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK == kvStateId) {
- keyGroupHasMoreKeys = false;
+ try (InputStream compressedKgIn = keygroupStreamCompressionDecorator.decorateWithCompression(currentStateHandleInStream)) {
+ DataInputViewStreamWrapper compressedKgInputView = new DataInputViewStreamWrapper(compressedKgIn);
+ //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
+ int kvStateId = compressedKgInputView.readShort();
+ ColumnFamilyHandle handle = currentStateHandleKVStateColumnFamilies.get(kvStateId);
+ //insert all k/v pairs into DB
+ while (keyGroupHasMoreKeys) {
+ byte[] key = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
+ byte[] value = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
+ if (RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(key)) {
+ //clear the signal bit in the key to make it ready for insertion again
+ RocksDBFullSnapshotOperation.clearMetaDataFollowsFlag(key);
+ rocksDBKeyedStateBackend.db.put(handle, key, value);
+ //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
+ kvStateId = RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK
+ & compressedKgInputView.readShort();
+ if (RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK == kvStateId) {
+ keyGroupHasMoreKeys = false;
+ } else {
+ handle = currentStateHandleKVStateColumnFamilies.get(kvStateId);
+ }
} else {
- handle = currentStateHandleKVStateColumnFamilies.get(kvStateId);
+ rocksDBKeyedStateBackend.db.put(handle, key, value);
}
- } else {
- rocksDBKeyedStateBackend.db.put(handle, key, value);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 26e6af1..fc66ccd 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -18,13 +18,14 @@
package org.apache.flink.api.common;
-import com.esotericsoftware.kryo.Serializer;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.TaskManagerOptions;
+import com.esotericsoftware.kryo.Serializer;
+
import java.io.Serializable;
import java.util.Collections;
import java.util.LinkedHashMap;
@@ -146,6 +147,9 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
*/
private long taskCancellationTimeoutMillis = -1;
+ /** This flag defines if we use compression for the state snapshot data or not. Default: false */
+ private boolean useSnapshotCompression = false;
+
// ------------------------------- User code values --------------------------------------------
private GlobalJobParameters globalJobParameters;
@@ -840,6 +844,14 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
this.autoTypeRegistrationEnabled = false;
}
+ public boolean isUseSnapshotCompression() {
+ return useSnapshotCompression;
+ }
+
+ public void setUseSnapshotCompression(boolean useSnapshotCompression) {
+ this.useSnapshotCompression = useSnapshotCompression;
+ }
+
@Override
public boolean equals(Object obj) {
if (obj instanceof ExecutionConfig) {
@@ -864,7 +876,8 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
defaultKryoSerializerClasses.equals(other.defaultKryoSerializerClasses) &&
registeredKryoTypes.equals(other.registeredKryoTypes) &&
registeredPojoTypes.equals(other.registeredPojoTypes) &&
- taskCancellationIntervalMillis == other.taskCancellationIntervalMillis;
+ taskCancellationIntervalMillis == other.taskCancellationIntervalMillis &&
+ useSnapshotCompression == other.useSnapshotCompression;
} else {
return false;
@@ -891,7 +904,8 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
defaultKryoSerializerClasses,
registeredKryoTypes,
registeredPojoTypes,
- taskCancellationIntervalMillis);
+ taskCancellationIntervalMillis,
+ useSnapshotCompression);
}
public boolean canEqual(Object obj) {
http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 602f788..654227a 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -144,6 +144,12 @@ under the License.
<artifactId>zookeeper</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ <version>1.1.4</version>
+ </dependency>
+
<!--
The KryoSerializer dynamically loads Kryo instances via Chill and requires that Chill
is in the classpath. Because we do not want to have transitive Scala dependencies
http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index 2b225df..30ca22e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -98,13 +98,18 @@ public abstract class AbstractKeyedStateBackend<K>
private final ExecutionConfig executionConfig;
+ /**
+ * Decoratores the input and output streams to write key-groups compressed.
+ */
+ protected final StreamCompressionDecorator keyGroupCompressionDecorator;
+
public AbstractKeyedStateBackend(
- TaskKvStateRegistry kvStateRegistry,
- TypeSerializer<K> keySerializer,
- ClassLoader userCodeClassLoader,
- int numberOfKeyGroups,
- KeyGroupRange keyGroupRange,
- ExecutionConfig executionConfig) {
+ TaskKvStateRegistry kvStateRegistry,
+ TypeSerializer<K> keySerializer,
+ ClassLoader userCodeClassLoader,
+ int numberOfKeyGroups,
+ KeyGroupRange keyGroupRange,
+ ExecutionConfig executionConfig) {
this.kvStateRegistry = kvStateRegistry;//Preconditions.checkNotNull(kvStateRegistry);
this.keySerializer = Preconditions.checkNotNull(keySerializer);
@@ -114,6 +119,15 @@ public abstract class AbstractKeyedStateBackend<K>
this.cancelStreamRegistry = new CloseableRegistry();
this.keyValueStatesByName = new HashMap<>();
this.executionConfig = executionConfig;
+ this.keyGroupCompressionDecorator = determineStreamCompression(executionConfig);
+ }
+
+ private StreamCompressionDecorator determineStreamCompression(ExecutionConfig executionConfig) {
+ if (executionConfig != null && executionConfig.isUseSnapshotCompression()) {
+ return SnappyStreamCompressionDecorator.INSTANCE;
+ } else {
+ return UncompressedStreamCompressionDecorator.INSTANCE;
+ }
}
/**
@@ -394,4 +408,9 @@ public abstract class AbstractKeyedStateBackend<K>
public boolean supportsAsynchronousSnapshots() {
return false;
}
+
+ @VisibleForTesting
+ public StreamCompressionDecorator getKeyGroupCompressionDecorator() {
+ return keyGroupCompressionDecorator;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
index 2ff8cb6..30b7344 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
@@ -38,7 +38,11 @@ import java.util.List;
*/
public class KeyedBackendSerializationProxy<K> extends VersionedIOReadableWritable {
- public static final int VERSION = 3;
+ public static final int VERSION = 4;
+
+ //TODO allow for more (user defined) compression formats + backwards compatibility story.
+ /** This specifies if we use a compressed format write the key-groups */
+ private boolean usingKeyGroupCompression;
private TypeSerializer<K> keySerializer;
private TypeSerializerConfigSnapshot keySerializerConfigSnapshot;
@@ -53,7 +57,10 @@ public class KeyedBackendSerializationProxy<K> extends VersionedIOReadableWritab
public KeyedBackendSerializationProxy(
TypeSerializer<K> keySerializer,
- List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots) {
+ List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots,
+ boolean compression) {
+
+ this.usingKeyGroupCompression = compression;
this.keySerializer = Preconditions.checkNotNull(keySerializer);
this.keySerializerConfigSnapshot = Preconditions.checkNotNull(keySerializer.snapshotConfiguration());
@@ -75,6 +82,10 @@ public class KeyedBackendSerializationProxy<K> extends VersionedIOReadableWritab
return keySerializerConfigSnapshot;
}
+ public boolean isUsingKeyGroupCompression() {
+ return usingKeyGroupCompression;
+ }
+
@Override
public int getVersion() {
return VERSION;
@@ -83,13 +94,16 @@ public class KeyedBackendSerializationProxy<K> extends VersionedIOReadableWritab
@Override
public int[] getCompatibleVersions() {
// we are compatible with version 3 (Flink 1.3.x) and version 1 & 2 (Flink 1.2.x)
- return new int[] {VERSION, 2, 1};
+ return new int[] {VERSION, 3, 2, 1};
}
@Override
public void write(DataOutputView out) throws IOException {
super.write(out);
+ // write the compression format used to write each key-group
+ out.writeBoolean(usingKeyGroupCompression);
+
// write in a way to be fault tolerant of read failures when deserializing the key serializer
TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(
out,
@@ -110,8 +124,16 @@ public class KeyedBackendSerializationProxy<K> extends VersionedIOReadableWritab
public void read(DataInputView in) throws IOException {
super.read(in);
+ final int readVersion = getReadVersion();
+
+ if (readVersion >= 4) {
+ usingKeyGroupCompression = in.readBoolean();
+ } else {
+ usingKeyGroupCompression = false;
+ }
+
// only starting from version 3, we have the key serializer and its config snapshot written
- if (getReadVersion() >= 3) {
+ if (readVersion >= 3) {
Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> keySerializerAndConfig =
TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(in, userCodeClassLoader).get(0);
this.keySerializer = (TypeSerializer<K>) keySerializerAndConfig.f0;
http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java
index 9108306..d4244e0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java
@@ -51,6 +51,7 @@ public class KeyedBackendStateMetaInfoSnapshotReaderWriters {
case 2:
return new KeyedBackendStateMetaInfoWriterV1V2<>(stateMetaInfo);
+ case 3:
// current version
case KeyedBackendSerializationProxy.VERSION:
return new KeyedBackendStateMetaInfoWriterV3<>(stateMetaInfo);
@@ -130,6 +131,7 @@ public class KeyedBackendStateMetaInfoSnapshotReaderWriters {
return new KeyedBackendStateMetaInfoReaderV1V2<>(userCodeClassLoader);
// current version
+ case 3:
case KeyedBackendSerializationProxy.VERSION:
return new KeyedBackendStateMetaInfoReaderV3<>(userCodeClassLoader);
http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnappyStreamCompressionDecorator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnappyStreamCompressionDecorator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnappyStreamCompressionDecorator.java
new file mode 100644
index 0000000..194cb2c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnappyStreamCompressionDecorator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.util.NonClosingInputStreamDecorator;
+import org.apache.flink.runtime.util.NonClosingOutpusStreamDecorator;
+
+import org.xerial.snappy.SnappyFramedInputStream;
+import org.xerial.snappy.SnappyFramedOutputStream;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * This implementation decorates the stream with snappy compression.
+ */
+@Internal
+public class SnappyStreamCompressionDecorator extends StreamCompressionDecorator {
+
+ public static final StreamCompressionDecorator INSTANCE = new SnappyStreamCompressionDecorator();
+
+ private static final long serialVersionUID = 1L;
+
+ private static final int COMPRESSION_BLOCK_SIZE = 64 * 1024;
+ private static final double MIN_COMPRESSION_RATIO = 0.85d;
+
+ @Override
+ protected OutputStream decorateWithCompression(NonClosingOutpusStreamDecorator stream) throws IOException {
+ return new SnappyFramedOutputStream(stream, COMPRESSION_BLOCK_SIZE, MIN_COMPRESSION_RATIO);
+ }
+
+ @Override
+ protected InputStream decorateWithCompression(NonClosingInputStreamDecorator stream) throws IOException {
+ return new SnappyFramedInputStream(stream, false);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StatePartitionStreamProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StatePartitionStreamProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StatePartitionStreamProvider.java
index 8b07da8..50e1b3c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StatePartitionStreamProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StatePartitionStreamProvider.java
@@ -19,7 +19,7 @@
package org.apache.flink.runtime.state;
import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.runtime.util.NonClosingStreamDecorator;
+import org.apache.flink.runtime.util.NonClosingInputStreamDecorator;
import org.apache.flink.util.Preconditions;
import java.io.IOException;
@@ -45,7 +45,7 @@ public class StatePartitionStreamProvider {
}
public StatePartitionStreamProvider(InputStream stream) {
- this.stream = new NonClosingStreamDecorator(Preconditions.checkNotNull(stream));
+ this.stream = new NonClosingInputStreamDecorator(Preconditions.checkNotNull(stream));
this.creationException = null;
}
@@ -59,4 +59,4 @@ public class StatePartitionStreamProvider {
}
return stream;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamCompressionDecorator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamCompressionDecorator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamCompressionDecorator.java
new file mode 100644
index 0000000..29bc519
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamCompressionDecorator.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.util.NonClosingInputStreamDecorator;
+import org.apache.flink.runtime.util.NonClosingOutpusStreamDecorator;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+/**
+ * Implementations of this interface decorate streams with a compression scheme. Subclasses should be stateless.
+ */
+@Internal
+public abstract class StreamCompressionDecorator implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Decorates the stream by wrapping it into a stream that applies a compression.
+ *
+ * IMPORTANT: For streams returned by this method, {@link OutputStream#close()} is not propagated to the inner
+ * stream. The inner stream must be closed separately.
+ *
+ * @param stream the stream to decorate.
+ * @return an output stream that is decorated by the compression scheme.
+ */
+ public final OutputStream decorateWithCompression(OutputStream stream) throws IOException {
+ return decorateWithCompression(new NonClosingOutpusStreamDecorator(stream));
+ }
+
+ /**
+ * IMPORTANT: For streams returned by this method, {@link InputStream#close()} is not propagated to the inner
+ * stream. The inner stream must be closed separately.
+ *
+ * @param stream the stream to decorate.
+ * @return an input stream that is decorated by the compression scheme.
+ */
+ public final InputStream decorateWithCompression(InputStream stream) throws IOException {
+ return decorateWithCompression(new NonClosingInputStreamDecorator(stream));
+ }
+
+ /**
+ * @param stream the stream to decorate
+ * @return an output stream that is decorated by the compression scheme.
+ */
+ protected abstract OutputStream decorateWithCompression(NonClosingOutpusStreamDecorator stream) throws IOException;
+
+ /**
+ * @param stream the stream to decorate.
+ * @return an input stream that is decorated by the compression scheme.
+ */
+ protected abstract InputStream decorateWithCompression(NonClosingInputStreamDecorator stream) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/main/java/org/apache/flink/runtime/state/UncompressedStreamCompressionDecorator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/UncompressedStreamCompressionDecorator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/UncompressedStreamCompressionDecorator.java
new file mode 100644
index 0000000..aa04020
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/UncompressedStreamCompressionDecorator.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.util.NonClosingInputStreamDecorator;
+import org.apache.flink.runtime.util.NonClosingOutpusStreamDecorator;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * This implementation does not decorate the stream with any compression.
+ */
+@Internal
+public class UncompressedStreamCompressionDecorator extends StreamCompressionDecorator {
+
+ public static final StreamCompressionDecorator INSTANCE = new UncompressedStreamCompressionDecorator();
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ protected OutputStream decorateWithCompression(NonClosingOutpusStreamDecorator stream) throws IOException {
+ return stream;
+ }
+
+ @Override
+ protected InputStream decorateWithCompression(NonClosingInputStreamDecorator stream) throws IOException {
+ return stream;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 35a70bb..055274f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -18,8 +18,6 @@
package org.apache.flink.runtime.state.heap;
-import org.apache.commons.collections.map.HashedMap;
-import org.apache.commons.io.IOUtils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
@@ -57,7 +55,10 @@ import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
+import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
+import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
import org.apache.flink.runtime.state.internal.InternalAggregatingState;
import org.apache.flink.runtime.state.internal.InternalFoldingState;
import org.apache.flink.runtime.state.internal.InternalListState;
@@ -67,15 +68,21 @@ import org.apache.flink.runtime.state.internal.InternalValueState;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StateMigrationException;
+
+import org.apache.commons.collections.map.HashedMap;
+import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.RunnableFuture;
/**
@@ -316,7 +323,10 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
final KeyedBackendSerializationProxy<K> serializationProxy =
- new KeyedBackendSerializationProxy<>(keySerializer, metaInfoSnapshots);
+ new KeyedBackendSerializationProxy<>(
+ keySerializer,
+ metaInfoSnapshots,
+ !Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, keyGroupCompressionDecorator));
//--------------------------------------------------- this becomes the end of sync part
@@ -331,6 +341,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
@Override
public KeyGroupsStateHandle performOperation() throws Exception {
long asyncStartTime = System.currentTimeMillis();
+
CheckpointStreamFactory.CheckpointStateOutputStream stream = getIoHandle();
DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(stream);
serializationProxy.write(outView);
@@ -343,8 +354,11 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
outView.writeInt(keyGroupId);
for (Map.Entry<String, StateTable<K, ?, ?>> kvState : stateTables.entrySet()) {
- outView.writeShort(kVStateToId.get(kvState.getKey()));
- cowStateStableSnapshots.get(kvState.getValue()).writeMappingsInKeyGroup(outView, keyGroupId);
+ OutputStream kgCompressionOut = keyGroupCompressionDecorator.decorateWithCompression(stream);
+ DataOutputViewStreamWrapper kgCompressionView = new DataOutputViewStreamWrapper(kgCompressionOut);
+ kgCompressionView.writeShort(kVStateToId.get(kvState.getKey()));
+ cowStateStableSnapshots.get(kvState.getValue()).writeMappingsInKeyGroup(kgCompressionView, keyGroupId);
+ kgCompressionOut.close(); // this will just close the outer stream
}
}
@@ -492,6 +506,9 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
}
+ final StreamCompressionDecorator streamCompressionDecorator = serializationProxy.isUsingKeyGroupCompression() ?
+ SnappyStreamCompressionDecorator.INSTANCE : UncompressedStreamCompressionDecorator.INSTANCE;
+
for (Tuple2<Integer, Long> groupOffset : keyGroupsStateHandle.getGroupRangeOffsets()) {
int keyGroupIndex = groupOffset.f0;
long offset = groupOffset.f1;
@@ -503,19 +520,26 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
int writtenKeyGroupIndex = inView.readInt();
- Preconditions.checkState(writtenKeyGroupIndex == keyGroupIndex,
+ try (InputStream kgCompressionInStream =
+ streamCompressionDecorator.decorateWithCompression(fsDataInputStream)) {
+
+ DataInputViewStreamWrapper kgCompressionInView =
+ new DataInputViewStreamWrapper(kgCompressionInStream);
+
+ Preconditions.checkState(writtenKeyGroupIndex == keyGroupIndex,
"Unexpected key-group in restore.");
- for (int i = 0; i < restoredMetaInfos.size(); i++) {
- int kvStateId = inView.readShort();
- StateTable<K, ?, ?> stateTable = stateTables.get(kvStatesById.get(kvStateId));
+ for (int i = 0; i < restoredMetaInfos.size(); i++) {
+ int kvStateId = kgCompressionInView.readShort();
+ StateTable<K, ?, ?> stateTable = stateTables.get(kvStatesById.get(kvStateId));
- StateTableByKeyGroupReader keyGroupReader =
+ StateTableByKeyGroupReader keyGroupReader =
StateTableByKeyGroupReaders.readerForVersion(
- stateTable,
- serializationProxy.getReadVersion());
+ stateTable,
+ serializationProxy.getReadVersion());
- keyGroupReader.readMappingsInKeyGroup(inView, keyGroupIndex);
+ keyGroupReader.readMappingsInKeyGroup(kgCompressionInView, keyGroupIndex);
+ }
}
}
} finally {
http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java
index d7bc94e..0b69a87 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java
@@ -48,6 +48,7 @@ class StateTableByKeyGroupReaders {
return new StateTableByKeyGroupReaderV1<>(table);
case 2:
case 3:
+ case 4:
return new StateTableByKeyGroupReaderV2V3<>(table);
default:
throw new IllegalArgumentException("Unknown version: " + version);
http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ForwardingInputStream.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ForwardingInputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ForwardingInputStream.java
new file mode 100644
index 0000000..a539b8b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ForwardingInputStream.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Input stream, that wraps another input stream and forwards all method calls to the wrapped stream.
+ */
+@Internal
+public class ForwardingInputStream extends InputStream {
+
+ private final InputStream delegate;
+
+ public ForwardingInputStream(InputStream delegate) {
+ this.delegate = Preconditions.checkNotNull(delegate);
+ }
+
+ @Override
+ public int read() throws IOException {
+ return delegate.read();
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ return delegate.read(b);
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ return delegate.read(b, off, len);
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ return delegate.skip(n);
+ }
+
+ @Override
+ public int available() throws IOException {
+ return delegate.available();
+ }
+
+ @Override
+ public void close() throws IOException {
+ delegate.close();
+ }
+
+ @Override
+ public void mark(int readlimit) {
+ delegate.mark(readlimit);
+ }
+
+ @Override
+ public void reset() throws IOException {
+ delegate.reset();
+ }
+
+ @Override
+ public boolean markSupported() {
+ return delegate.markSupported();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ForwardingOutputStream.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ForwardingOutputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ForwardingOutputStream.java
new file mode 100644
index 0000000..6026373
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ForwardingOutputStream.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * Output stream, that wraps another input stream and forwards all method calls to the wrapped stream.
+ */
+@Internal
+public class ForwardingOutputStream extends OutputStream {
+
+ private final OutputStream delegate;
+
+ public ForwardingOutputStream(OutputStream delegate) {
+ this.delegate = Preconditions.checkNotNull(delegate);
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ delegate.write(b);
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ delegate.write(b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ delegate.write(b, off, len);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ delegate.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ delegate.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonClosingInputStreamDecorator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonClosingInputStreamDecorator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonClosingInputStreamDecorator.java
new file mode 100644
index 0000000..52738dc
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonClosingInputStreamDecorator.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.annotation.Internal;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Decorator for input streams that ignores calls to {@link InputStream#close()}.
+ */
+@Internal
+public class NonClosingInputStreamDecorator extends ForwardingInputStream {
+
+ public NonClosingInputStreamDecorator(InputStream delegate) {
+ super(delegate);
+ }
+
+ @Override
+ public void close() throws IOException {
+ // ignore
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonClosingOutpusStreamDecorator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonClosingOutpusStreamDecorator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonClosingOutpusStreamDecorator.java
new file mode 100644
index 0000000..e8f6183
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonClosingOutpusStreamDecorator.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.annotation.Internal;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * Decorator for input streams that ignores calls to {@link OutputStream#close()}.
+ */
+@Internal
+public class NonClosingOutpusStreamDecorator extends ForwardingOutputStream {
+
+
+ public NonClosingOutpusStreamDecorator(OutputStream delegate) {
+ super(delegate);
+ }
+
+ @Override
+ public void close() throws IOException {
+ // ignore
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonClosingStreamDecorator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonClosingStreamDecorator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonClosingStreamDecorator.java
deleted file mode 100644
index ba7bc79..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonClosingStreamDecorator.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-/**
- * Decorator for input streams that ignores calls to {@link InputStream#close()}.
- */
-public class NonClosingStreamDecorator extends InputStream {
-
- private final InputStream delegate;
-
- public NonClosingStreamDecorator(InputStream delegate) {
- this.delegate = delegate;
- }
-
- @Override
- public int read() throws IOException {
- return delegate.read();
- }
-
- @Override
- public int read(byte[] b) throws IOException {
- return delegate.read(b);
- }
-
- @Override
- public int read(byte[] b, int off, int len) throws IOException {
- return delegate.read(b, off, len);
- }
-
- @Override
- public long skip(long n) throws IOException {
- return delegate.skip(n);
- }
-
- @Override
- public int available() throws IOException {
- return super.available();
- }
-
- @Override
- public void close() throws IOException {
- // ignore
- }
-
- @Override
- public void mark(int readlimit) {
- super.mark(readlimit);
- }
-
- @Override
- public void reset() throws IOException {
- super.reset();
- }
-
- @Override
- public boolean markSupported() {
- return super.markSupported();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
index 920aa69..341d4fe 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -64,7 +65,7 @@ public class SerializationProxiesTest {
StateDescriptor.Type.VALUE, "c", namespaceSerializer, stateSerializer).snapshot());
KeyedBackendSerializationProxy<?> serializationProxy =
- new KeyedBackendSerializationProxy<>(keySerializer, stateMetaInfoList);
+ new KeyedBackendSerializationProxy<>(keySerializer, stateMetaInfoList, true);
byte[] serialized;
try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
@@ -79,6 +80,7 @@ public class SerializationProxiesTest {
serializationProxy.read(new DataInputViewStreamWrapper(in));
}
+ Assert.assertEquals(true, serializationProxy.isUsingKeyGroupCompression());
Assert.assertEquals(keySerializer, serializationProxy.getKeySerializer());
Assert.assertEquals(keySerializer.snapshotConfiguration(), serializationProxy.getKeySerializerConfigSnapshot());
Assert.assertEquals(stateMetaInfoList, serializationProxy.getStateMetaInfoSnapshots());
@@ -101,7 +103,7 @@ public class SerializationProxiesTest {
StateDescriptor.Type.VALUE, "c", namespaceSerializer, stateSerializer).snapshot());
KeyedBackendSerializationProxy<?> serializationProxy =
- new KeyedBackendSerializationProxy<>(keySerializer, stateMetaInfoList);
+ new KeyedBackendSerializationProxy<>(keySerializer, stateMetaInfoList, true);
byte[] serialized;
try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
@@ -122,6 +124,7 @@ public class SerializationProxiesTest {
serializationProxy.read(new DataInputViewStreamWrapper(in));
}
+ Assert.assertEquals(true, serializationProxy.isUsingKeyGroupCompression());
Assert.assertEquals(null, serializationProxy.getKeySerializer());
Assert.assertEquals(keySerializer.snapshotConfiguration(), serializationProxy.getKeySerializerConfigSnapshot());
http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
new file mode 100644
index 0000000..b932cb9
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.heap.HeapReducingStateTest;
+import org.apache.flink.runtime.state.internal.InternalValueState;
+import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
+
+import org.apache.commons.io.IOUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.concurrent.RunnableFuture;
+
+import static org.mockito.Mockito.mock;
+
+public class StateSnapshotCompressionTest {
+
+ @Test
+ public void testCompressionConfiguration() throws Exception {
+
+ ExecutionConfig executionConfig = new ExecutionConfig();
+ executionConfig.setUseSnapshotCompression(true);
+
+ AbstractKeyedStateBackend<String> stateBackend = new HeapKeyedStateBackend<>(
+ mock(TaskKvStateRegistry.class),
+ StringSerializer.INSTANCE,
+ HeapReducingStateTest.class.getClassLoader(),
+ 16,
+ new KeyGroupRange(0, 15),
+ true,
+ executionConfig);
+
+ try {
+ Assert.assertTrue(SnappyStreamCompressionDecorator.INSTANCE.equals(stateBackend.getKeyGroupCompressionDecorator()));
+
+ } finally {
+ IOUtils.closeQuietly(stateBackend);
+ stateBackend.dispose();
+ }
+
+ executionConfig = new ExecutionConfig();
+ executionConfig.setUseSnapshotCompression(false);
+
+ stateBackend = new HeapKeyedStateBackend<>(
+ mock(TaskKvStateRegistry.class),
+ StringSerializer.INSTANCE,
+ HeapReducingStateTest.class.getClassLoader(),
+ 16,
+ new KeyGroupRange(0, 15),
+ true,
+ executionConfig);
+
+ try {
+ Assert.assertTrue(UncompressedStreamCompressionDecorator.INSTANCE.equals(stateBackend.getKeyGroupCompressionDecorator()));
+
+ } finally {
+ IOUtils.closeQuietly(stateBackend);
+ stateBackend.dispose();
+ }
+ }
+
+ @Test
+ public void snapshotRestoreRoundtripWithCompression() throws Exception {
+ snapshotRestoreRoundtrip(true);
+ }
+
+ @Test
+ public void snapshotRestoreRoundtripUncompressed() throws Exception {
+ snapshotRestoreRoundtrip(false);
+ }
+
+ private void snapshotRestoreRoundtrip(boolean useCompression) throws Exception {
+
+ ExecutionConfig executionConfig = new ExecutionConfig();
+ executionConfig.setUseSnapshotCompression(useCompression);
+
+ KeyedStateHandle stateHandle = null;
+
+ ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("test", String.class);
+ stateDescriptor.initializeSerializerUnlessSet(executionConfig);
+
+ AbstractKeyedStateBackend<String> stateBackend = new HeapKeyedStateBackend<>(
+ mock(TaskKvStateRegistry.class),
+ StringSerializer.INSTANCE,
+ HeapReducingStateTest.class.getClassLoader(),
+ 16,
+ new KeyGroupRange(0, 15),
+ true,
+ executionConfig);
+
+ try {
+
+ InternalValueState<VoidNamespace, String> state =
+ stateBackend.createValueState(
+ new VoidNamespaceSerializer(),
+ stateDescriptor);
+
+ stateBackend.setCurrentKey("A");
+ state.setCurrentNamespace(VoidNamespace.INSTANCE);
+ state.update("42");
+ stateBackend.setCurrentKey("B");
+ state.setCurrentNamespace(VoidNamespace.INSTANCE);
+ state.update("43");
+ stateBackend.setCurrentKey("C");
+ state.setCurrentNamespace(VoidNamespace.INSTANCE);
+ state.update("44");
+ stateBackend.setCurrentKey("D");
+ state.setCurrentNamespace(VoidNamespace.INSTANCE);
+ state.update("45");
+ CheckpointStreamFactory streamFactory = new MemCheckpointStreamFactory(4 * 1024 * 1024);
+ RunnableFuture<KeyedStateHandle> snapshot = stateBackend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forFullCheckpoint());
+ snapshot.run();
+ stateHandle = snapshot.get();
+
+ } finally {
+ IOUtils.closeQuietly(stateBackend);
+ stateBackend.dispose();
+ }
+
+ executionConfig = new ExecutionConfig();
+
+ stateBackend = new HeapKeyedStateBackend<>(
+ mock(TaskKvStateRegistry.class),
+ StringSerializer.INSTANCE,
+ HeapReducingStateTest.class.getClassLoader(),
+ 16,
+ new KeyGroupRange(0, 15),
+ true,
+ executionConfig);
+ try {
+
+ stateBackend.restore(Collections.singletonList(stateHandle));
+
+ InternalValueState<VoidNamespace, String> state = stateBackend.createValueState(
+ new VoidNamespaceSerializer(),
+ stateDescriptor);
+
+ stateBackend.setCurrentKey("A");
+ state.setCurrentNamespace(VoidNamespace.INSTANCE);
+ Assert.assertEquals("42", state.value());
+ stateBackend.setCurrentKey("B");
+ state.setCurrentNamespace(VoidNamespace.INSTANCE);
+ Assert.assertEquals("43", state.value());
+ stateBackend.setCurrentKey("C");
+ state.setCurrentNamespace(VoidNamespace.INSTANCE);
+ Assert.assertEquals("44", state.value());
+ stateBackend.setCurrentKey("D");
+ state.setCurrentNamespace(VoidNamespace.INSTANCE);
+ Assert.assertEquals("45", state.value());
+
+ } finally {
+ IOUtils.closeQuietly(stateBackend);
+ stateBackend.dispose();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
index 6ad7708..0021b81 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
@@ -111,6 +111,7 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
cluster.start();
env = new TestStreamEnvironment(cluster, PARALLELISM);
+ env.getConfig().setUseSnapshotCompression(true);
}
@AfterClass
@@ -318,6 +319,7 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
env.getConfig().disableSysoutLogging();
env.setStateBackend(this.stateBackend);
+ env.getConfig().setUseSnapshotCompression(true);
env
.addSource(new FailingSource(NUM_KEYS, NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3))
http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index 9df0d1a..a58ec51 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -18,7 +18,6 @@
package org.apache.flink.test.checkpointing;
-import io.netty.util.internal.ConcurrentSet;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
@@ -55,6 +54,8 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
+
+import io.netty.util.internal.ConcurrentSet;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.ClassRule;
@@ -62,11 +63,6 @@ import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import scala.Option;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
import java.io.File;
import java.util.ArrayList;
@@ -78,6 +74,12 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -679,6 +681,7 @@ public class RescalingITCase extends TestLogger {
}
env.enableCheckpointing(checkpointingInterval);
env.setRestartStrategy(RestartStrategies.noRestart());
+ env.getConfig().setUseSnapshotCompression(true);
DataStream<Integer> input = env.addSource(new SubtaskIndexSource(
numberKeys,