You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/16 21:06:32 UTC

[3/8] flink git commit: [FLINK-9489] Checkpoint timers as part of managed keyed state instead of raw keyed state

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 622dd0c..0697463 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -63,14 +63,18 @@ import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
 import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.Keyed;
 import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.LocalRecoveryConfig;
 import org.apache.flink.runtime.state.LocalRecoveryDirectoryProvider;
 import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
+import org.apache.flink.runtime.state.PriorityComparable;
 import org.apache.flink.runtime.state.PriorityComparator;
 import org.apache.flink.runtime.state.PriorityQueueSetFactory;
-import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
+import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
+import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
 import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
 import org.apache.flink.runtime.state.SnapshotDirectory;
 import org.apache.flink.runtime.state.SnapshotResult;
@@ -177,7 +181,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	private interface StateFactory {
 		<K, N, SV, S extends State, IS extends S> IS createState(
 			StateDescriptor<S, SV> stateDesc,
-			Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, SV>> registerResult,
+			Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> registerResult,
 			RocksDBKeyedStateBackend<K> backend) throws Exception;
 	}
 
@@ -224,7 +228,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	 * Information about the k/v states as we create them. This is used to retrieve the
 	 * column family that is used for a state and also for sanity checks when restoring.
 	 */
-	private final Map<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> kvStateInformation;
+	private final Map<String, Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>> kvStateInformation;
 
 	/**
 	 * Map of state names to their corresponding restored state meta info.
@@ -256,7 +260,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	private final SnapshotStrategy<SnapshotResult<KeyedStateHandle>> snapshotStrategy;
 
 	/** Factory for priority queue state. */
-	private PriorityQueueSetFactory priorityQueueFactory;
+	private final PriorityQueueSetFactory priorityQueueFactory;
+
+	private RocksDBWriteBatchWrapper writeBatchWrapper;
 
 	public RocksDBKeyedStateBackend(
 		String operatorIdentifier,
@@ -322,7 +328,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				this.priorityQueueFactory = new RocksDBPriorityQueueSetFactory();
 				break;
 			default:
-				break;
+				throw new IllegalArgumentException("Unknown priority queue state type: " + priorityQueueStateType);
 		}
 
 		LOG.debug("Setting initial keyed backend uid for operator {} to {}.", this.operatorIdentifier, this.backendUID);
@@ -344,12 +350,15 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	@SuppressWarnings("unchecked")
 	@Override
 	public <N> Stream<K> getKeys(String state, N namespace) {
-		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> columnInfo = kvStateInformation.get(state);
-		if (columnInfo == null) {
+		Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> columnInfo = kvStateInformation.get(state);
+		if (columnInfo == null || !(columnInfo.f1 instanceof RegisteredKeyValueStateBackendMetaInfo)) {
 			return Stream.empty();
 		}
 
-		final TypeSerializer<N> namespaceSerializer = (TypeSerializer<N>) columnInfo.f1.getNamespaceSerializer();
+		RegisteredKeyValueStateBackendMetaInfo<N, ?> registeredKeyValueStateBackendMetaInfo =
+			(RegisteredKeyValueStateBackendMetaInfo<N, ?>) columnInfo.f1;
+
+		final TypeSerializer<N> namespaceSerializer = registeredKeyValueStateBackendMetaInfo.getNamespaceSerializer();
 		final ByteArrayOutputStreamWithPos namespaceOutputStream = new ByteArrayOutputStreamWithPos(8);
 		boolean ambiguousKeyPossible = RocksDBKeySerializationUtils.isAmbiguousKeyPossible(keySerializer, namespaceSerializer);
 		final byte[] nameSpaceBytes;
@@ -396,6 +405,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		// working on the disposed object results in SEGFAULTS.
 		if (db != null) {
 
+			IOUtils.closeQuietly(writeBatchWrapper);
+
 			// RocksDB's native memory management requires that *all* CFs (including default) are closed before the
 			// DB is closed. See:
 			// https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#opening-a-database-with-column-families
@@ -403,16 +414,11 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			IOUtils.closeQuietly(defaultColumnFamily);
 
 			// ... continue with the ones created by Flink...
-			for (Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> columnMetaData :
+			for (Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> columnMetaData :
 				kvStateInformation.values()) {
 				IOUtils.closeQuietly(columnMetaData.f0);
 			}
 
-			// ... then close the priority queue related resources ...
-			if (priorityQueueFactory instanceof AutoCloseable) {
-				IOUtils.closeQuietly((AutoCloseable) priorityQueueFactory);
-			}
-
 			// ... and finally close the DB instance ...
 			IOUtils.closeQuietly(db);
 
@@ -431,13 +437,11 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 	@Nonnull
 	@Override
-	public <T extends HeapPriorityQueueElement> KeyGroupedInternalPriorityQueue<T> create(
+	public <T extends HeapPriorityQueueElement & PriorityComparable & Keyed> KeyGroupedInternalPriorityQueue<T>
+	create(
 		@Nonnull String stateName,
-		@Nonnull TypeSerializer<T> byteOrderedElementSerializer,
-		@Nonnull PriorityComparator<T> elementComparator,
-		@Nonnull KeyExtractorFunction<T> keyExtractor) {
-
-		return priorityQueueFactory.create(stateName, byteOrderedElementSerializer, elementComparator, keyExtractor);
+		@Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
+		return priorityQueueFactory.create(stateName, byteOrderedElementSerializer);
 	}
 
 	private void cleanInstanceBasePath() {
@@ -482,6 +486,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 	@Override
 	public void restore(Collection<KeyedStateHandle> restoreState) throws Exception {
+
 		LOG.info("Initializing RocksDB keyed state backend.");
 
 		if (LOG.isDebugEnabled()) {
@@ -534,6 +539,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	private void createDB() throws IOException {
 		List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1);
 		this.db = openDB(instanceRocksDBPath.getAbsolutePath(), Collections.emptyList(), columnFamilyHandles);
+		this.writeBatchWrapper = new RocksDBWriteBatchWrapper(db, writeOptions);
 		this.defaultColumnFamily = columnFamilyHandles.get(0);
 	}
 
@@ -679,7 +685,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 			for (StateMetaInfoSnapshot restoredMetaInfo : restoredMetaInfos) {
 
-				Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> registeredColumn =
+				Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> registeredColumn =
 					rocksDBKeyedStateBackend.kvStateInformation.get(restoredMetaInfo.getName());
 
 				if (registeredColumn == null) {
@@ -689,8 +695,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 						nameBytes,
 						rocksDBKeyedStateBackend.columnOptions);
 
-					RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo =
-						new RegisteredKeyedBackendStateMetaInfo<>(restoredMetaInfo);
+					RegisteredStateMetaInfoBase stateMetaInfo =
+						RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(restoredMetaInfo);
 
 					rocksDBKeyedStateBackend.restoredKvStateMetaInfos.put(restoredMetaInfo.getName(), restoredMetaInfo);
 
@@ -987,12 +993,12 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			ColumnFamilyHandle columnFamilyHandle,
 			StateMetaInfoSnapshot stateMetaInfoSnapshot) throws RocksDBException {
 
-			Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> registeredStateMetaInfoEntry =
+			Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> registeredStateMetaInfoEntry =
 				stateBackend.kvStateInformation.get(stateMetaInfoSnapshot.getName());
 
 			if (null == registeredStateMetaInfoEntry) {
-				RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo =
-					new RegisteredKeyedBackendStateMetaInfo<>(stateMetaInfoSnapshot);
+				RegisteredStateMetaInfoBase stateMetaInfo =
+					RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(stateMetaInfoSnapshot);
 
 				registeredStateMetaInfoEntry =
 					new Tuple2<>(
@@ -1037,6 +1043,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 					stateBackend.db = restoreDBInfo.db;
 					stateBackend.defaultColumnFamily = restoreDBInfo.defaultColumnFamilyHandle;
+					stateBackend.writeBatchWrapper =
+						new RocksDBWriteBatchWrapper(stateBackend.db, stateBackend.writeOptions);
 
 					for (int i = 0; i < restoreDBInfo.stateMetaInfoSnapshots.size(); ++i) {
 						getOrRegisterColumnFamilyHandle(
@@ -1061,6 +1069,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 					Collections.emptyList(),
 					columnFamilyHandles);
 				stateBackend.defaultColumnFamily = columnFamilyHandles.get(0);
+				stateBackend.writeBatchWrapper =
+					new RocksDBWriteBatchWrapper(stateBackend.db, stateBackend.writeOptions);
 			}
 		}
 
@@ -1115,13 +1125,14 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 			// extract and store the default column family which is located at the first index
 			stateBackend.defaultColumnFamily = columnFamilyHandles.remove(0);
+			stateBackend.writeBatchWrapper = new RocksDBWriteBatchWrapper(stateBackend.db, stateBackend.writeOptions);
 
 			for (int i = 0; i < columnFamilyDescriptors.size(); ++i) {
 				StateMetaInfoSnapshot stateMetaInfoSnapshot = stateMetaInfoSnapshots.get(i);
 
 				ColumnFamilyHandle columnFamilyHandle = columnFamilyHandles.get(i);
-				RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo =
-					new RegisteredKeyedBackendStateMetaInfo<>(stateMetaInfoSnapshot);
+				RegisteredStateMetaInfoBase stateMetaInfo =
+					RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(stateMetaInfoSnapshot);
 
 				stateBackend.kvStateInformation.put(
 					stateMetaInfoSnapshot.getName(),
@@ -1290,14 +1301,14 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	 * already have a registered entry for that and return it (after some necessary state compatibility checks)
 	 * or create a new one if it does not exist.
 	 */
-	private <N, S> Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, S>> tryRegisterKvStateInformation(
+	private <N, S> Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, S>> tryRegisterKvStateInformation(
 			StateDescriptor<?, S> stateDesc,
 			TypeSerializer<N> namespaceSerializer) throws StateMigrationException {
 
-		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo =
+		Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> stateInfo =
 			kvStateInformation.get(stateDesc.getName());
 
-		RegisteredKeyedBackendStateMetaInfo<N, S> newMetaInfo;
+		RegisteredKeyValueStateBackendMetaInfo<N, S> newMetaInfo;
 		if (stateInfo != null) {
 
 			@SuppressWarnings("unchecked")
@@ -1308,7 +1319,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				"Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," +
 					" but its corresponding restored snapshot cannot be found.");
 
-			newMetaInfo = RegisteredKeyedBackendStateMetaInfo.resolveKvStateCompatibility(
+			newMetaInfo = RegisteredKeyValueStateBackendMetaInfo.resolveKvStateCompatibility(
 				restoredMetaInfoSnapshot,
 				namespaceSerializer,
 				stateDesc);
@@ -1317,13 +1328,13 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		} else {
 			String stateName = stateDesc.getName();
 
-			newMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>(
+			newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>(
 				stateDesc.getType(),
 				stateName,
 				namespaceSerializer,
 				stateDesc.getSerializer());
 
-			ColumnFamilyHandle columnFamily = createColumnFamily(stateName, db);
+			ColumnFamilyHandle columnFamily = createColumnFamily(stateName);
 
 			stateInfo = Tuple2.of(columnFamily, newMetaInfo);
 			kvStateInformation.put(stateDesc.getName(), stateInfo);
@@ -1335,7 +1346,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	/**
 	 * Creates a column family handle for use with a k/v state.
 	 */
-	private ColumnFamilyHandle createColumnFamily(String stateName, RocksDB db) {
+	private ColumnFamilyHandle createColumnFamily(String stateName) {
 		byte[] nameBytes = stateName.getBytes(ConfigConstants.DEFAULT_CHARSET);
 		Preconditions.checkState(!Arrays.equals(RocksDB.DEFAULT_COLUMN_FAMILY, nameBytes),
 			"The chosen state name 'default' collides with the name of the default column family!");
@@ -1359,7 +1370,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				stateDesc.getClass(), this.getClass());
 			throw new FlinkRuntimeException(message);
 		}
-		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, SV>> registerResult =
+		Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> registerResult =
 			tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
 		return stateFactory.createState(stateDesc, registerResult, RocksDBKeyedStateBackend.this);
 	}
@@ -1382,7 +1393,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	public int numStateEntries() {
 		int count = 0;
 
-		for (Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> column : kvStateInformation.values()) {
+		for (Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> column : kvStateInformation.values()) {
+			//TODO maybe filter only for k/v states
 			try (RocksIteratorWrapper rocksIterator = getRocksIterator(db, column.f0)) {
 				rocksIterator.seekToFirst();
 
@@ -1405,7 +1417,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	@VisibleForTesting
 	static final class RocksDBMergeIterator implements AutoCloseable {
 
-		private final PriorityQueue<MergeIterator> heap;
+		private final PriorityQueue<RocksDBKeyedStateBackend.MergeIterator> heap;
 		private final int keyGroupPrefixByteCount;
 		private boolean newKeyGroup;
 		private boolean newKVState;
@@ -1763,6 +1775,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 			final CloseableRegistry snapshotCloseableRegistry = new CloseableRegistry();
 
+			// flush everything into db before taking a snapshot
+			writeBatchWrapper.flush();
+
 			final RocksDBFullSnapshotOperation<K> snapshotOperation =
 				new RocksDBFullSnapshotOperation<>(
 					RocksDBKeyedStateBackend.this,
@@ -1982,7 +1997,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 			this.copiedColumnFamilyHandles = new ArrayList<>(stateBackend.kvStateInformation.size());
 
-			for (Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> tuple2 :
+			for (Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> tuple2 :
 				stateBackend.kvStateInformation.values()) {
 				// snapshot meta info
 				this.stateMetaInfoSnapshots.add(tuple2.f1.snapshot());
@@ -2433,7 +2448,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				"assuming the following (shared) files as base: {}.", checkpointId, lastCompletedCheckpoint, baseSstFiles);
 
 			// save meta data
-			for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry
+			for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>> stateMetaInfoEntry
 				: stateBackend.kvStateInformation.entrySet()) {
 				stateMetaInfoSnapshots.add(stateMetaInfoEntry.getValue().f1.snapshot());
 			}
@@ -2625,10 +2640,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	/**
 	 * Encapsulates the logic and resources in connection with creating priority queue state structures.
 	 */
-	class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory, AutoCloseable {
+	class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory {
 
 		/** Default cache size per key-group. */
-		private static final int DEFAULT_CACHES_SIZE = 8 * 1024;
+		private static final int DEFAULT_CACHES_SIZE = 1024;
 
 		/** A shared buffer to serialize elements for the priority queue. */
 		@Nonnull
@@ -2638,68 +2653,47 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		@Nonnull
 		private final DataOutputViewStreamWrapper elementSerializationOutView;
 
-		/** A shared {@link RocksDBWriteBatchWrapper} to batch modifications to priority queues. */
-		@Nonnull
-		private final RocksDBWriteBatchWrapper writeBatchWrapper;
-
-		/** Map to track all column families created to back priority queues. */
-		@Nonnull
-		private final Map<String, ColumnFamilyHandle> priorityQueueColumnFamilies;
-
-		/** The mandatory default column family, so that we can close it later. */
-		@Nonnull
-		private final ColumnFamilyHandle defaultColumnFamily;
-
-		/** Path of the RocksDB instance that holds the priority queues. */
-		@Nonnull
-		private final File pqInstanceRocksDBPath;
-
-		/** RocksDB instance that holds the priority queues. */
-		@Nonnull
-		private final RocksDB pqDb;
-
-		RocksDBPriorityQueueSetFactory() throws IOException {
-			this.pqInstanceRocksDBPath = new File(instanceBasePath, "pqdb");
-			if (pqInstanceRocksDBPath.exists()) {
-				try {
-					FileUtils.deleteDirectory(pqInstanceRocksDBPath);
-				} catch (IOException ex) {
-					LOG.warn("Could not delete instance path for PQ RocksDB: " + pqInstanceRocksDBPath, ex);
-				}
-			}
-			List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1);
-			this.pqDb = openDB(pqInstanceRocksDBPath.getAbsolutePath(), Collections.emptyList(), columnFamilyHandles);
+		RocksDBPriorityQueueSetFactory() {
 			this.elementSerializationOutStream = new ByteArrayOutputStreamWithPos();
 			this.elementSerializationOutView = new DataOutputViewStreamWrapper(elementSerializationOutStream);
-			this.writeBatchWrapper = new RocksDBWriteBatchWrapper(pqDb, writeOptions);
-			this.defaultColumnFamily = columnFamilyHandles.get(0);
-			this.priorityQueueColumnFamilies = new HashMap<>();
 		}
 
 		@Nonnull
 		@Override
-		public <T extends HeapPriorityQueueElement> KeyGroupedInternalPriorityQueue<T> create(
+		public <T extends HeapPriorityQueueElement & PriorityComparable & Keyed> KeyGroupedInternalPriorityQueue<T>
+		create(
 			@Nonnull String stateName,
-			@Nonnull TypeSerializer<T> byteOrderedElementSerializer,
-			@Nonnull PriorityComparator<T> elementPriorityComparator,
-			@Nonnull KeyExtractorFunction<T> keyExtractor) {
+			@Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
+
+			final PriorityComparator<T> priorityComparator =
+				PriorityComparator.forPriorityComparableObjects();
+
+			Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> entry =
+				kvStateInformation.get(stateName);
 
-			final ColumnFamilyHandle columnFamilyHandle =
-				priorityQueueColumnFamilies.computeIfAbsent(
-					stateName,
-					(name) -> RocksDBKeyedStateBackend.this.createColumnFamily(name, pqDb));
+			if (entry == null) {
+				RegisteredPriorityQueueStateBackendMetaInfo<T> metaInfo =
+					new RegisteredPriorityQueueStateBackendMetaInfo<>(stateName, byteOrderedElementSerializer);
+
+				final ColumnFamilyHandle columnFamilyHandle = createColumnFamily(stateName);
+
+				entry = new Tuple2<>(columnFamilyHandle, metaInfo);
+				kvStateInformation.put(stateName, entry);
+			}
+
+			final ColumnFamilyHandle columnFamilyHandle = entry.f0;
 
 			@Nonnull
 			TieBreakingPriorityComparator<T> tieBreakingComparator =
 				new TieBreakingPriorityComparator<>(
-					elementPriorityComparator,
+					priorityComparator,
 					byteOrderedElementSerializer,
 					elementSerializationOutStream,
 					elementSerializationOutView);
 
 			return new KeyGroupPartitionedPriorityQueue<>(
-				keyExtractor,
-				elementPriorityComparator,
+				KeyExtractorFunction.forKeyedObjects(),
+				priorityComparator,
 				new KeyGroupPartitionedPriorityQueue.PartitionQueueSetFactory<T, CachingInternalPriorityQueueSet<T>>() {
 					@Nonnull
 					@Override
@@ -2714,7 +2708,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 							new RocksDBOrderedSetStore<>(
 								keyGroupId,
 								keyGroupPrefixBytes,
-								pqDb,
+								db,
 								columnFamilyHandle,
 								byteOrderedElementSerializer,
 								elementSerializationOutStream,
@@ -2727,20 +2721,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				keyGroupRange,
 				numberOfKeyGroups);
 		}
+	}
 
-		@Override
-		public void close() {
-			IOUtils.closeQuietly(writeBatchWrapper);
-			for (ColumnFamilyHandle columnFamilyHandle : priorityQueueColumnFamilies.values()) {
-				IOUtils.closeQuietly(columnFamilyHandle);
-			}
-			IOUtils.closeQuietly(defaultColumnFamily);
-			IOUtils.closeQuietly(pqDb);
-			try {
-				FileUtils.deleteDirectory(pqInstanceRocksDBPath);
-			} catch (IOException ex) {
-				LOG.warn("Could not delete instance path for PQ RocksDB: " + pqInstanceRocksDBPath, ex);
-			}
-		}
+	@Override
+	public boolean requiresLegacySynchronousTimerSnapshots() {
+		return priorityQueueFactory instanceof HeapPriorityQueueSetFactory;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
index 03faa44..aa5e93a 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -26,7 +26,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
@@ -257,7 +257,7 @@ class RocksDBListState<K, N, V>
 	@SuppressWarnings("unchecked")
 	static <E, K, N, SV, S extends State, IS extends S> IS create(
 		StateDescriptor<S, SV> stateDesc,
-		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, SV>> registerResult,
+		Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> registerResult,
 		RocksDBKeyedStateBackend<K> backend) {
 		return (IS) new RocksDBListState<>(
 			registerResult.f0,

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index 9d00a67..4ec1f77 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -30,7 +30,7 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
-import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.internal.InternalMapState;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
@@ -616,7 +616,7 @@ class RocksDBMapState<K, N, UK, UV>
 	@SuppressWarnings("unchecked")
 	static <UK, UV, K, N, SV, S extends State, IS extends S> IS create(
 		StateDescriptor<S, SV> stateDesc,
-		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, SV>> registerResult,
+		Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> registerResult,
 		RocksDBKeyedStateBackend<K> backend) {
 		return (IS) new RocksDBMapState<>(
 			registerResult.f0,

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStore.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStore.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStore.java
index 5284314..4068c50 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStore.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStore.java
@@ -159,7 +159,6 @@ public class RocksDBOrderedSetStore<T> implements CachingInternalPriorityQueueSe
 	public RocksToJavaIteratorAdapter orderedIterator() {
 
 		flushWriteBatch();
-
 		return new RocksToJavaIteratorAdapter(
 			new RocksIteratorWrapper(
 				db.newIterator(columnFamilyHandle)));

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
index d138045..490960e 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
@@ -27,7 +27,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.internal.InternalReducingState;
 import org.apache.flink.util.FlinkRuntimeException;
 
@@ -172,7 +172,7 @@ class RocksDBReducingState<K, N, V>
 	@SuppressWarnings("unchecked")
 	static <K, N, SV, S extends State, IS extends S> IS create(
 		StateDescriptor<S, SV> stateDesc,
-		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, SV>> registerResult,
+		Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> registerResult,
 		RocksDBKeyedStateBackend<K> backend) {
 		return (IS) new RocksDBReducingState<>(
 			registerResult.f0,

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
index 2b60fc1..5ae894e 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
@@ -25,7 +25,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.internal.InternalValueState;
 import org.apache.flink.util.FlinkRuntimeException;
 
@@ -116,7 +116,7 @@ class RocksDBValueState<K, N, V>
 	@SuppressWarnings("unchecked")
 	static <K, N, SV, S extends State, IS extends S> IS create(
 		StateDescriptor<S, SV> stateDesc,
-		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, SV>> registerResult,
+		Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> registerResult,
 		RocksDBKeyedStateBackend<K> backend) {
 		return (IS) new RocksDBValueState<>(
 			registerResult.f0,

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java
index d54f122..05adf4f 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java
@@ -102,6 +102,10 @@ public class RocksDBWriteBatchWrapper implements AutoCloseable {
 		batch.clear();
 	}
 
+	public WriteOptions getOptions() {
+		return options;
+	}
+
 	@Override
 	public void close() throws RocksDBException {
 		if (batch.count() != 0) {

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 797a26a..52ba3e4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -402,7 +402,11 @@ public abstract class AbstractStreamOperator<OUT>
 	 * @param context context that provides information and means required for taking a snapshot
 	 */
 	public void snapshotState(StateSnapshotContext context) throws Exception {
-		if (getKeyedStateBackend() != null) {
+		final KeyedStateBackend<?> keyedStateBackend = getKeyedStateBackend();
+		//TODO all of this can be removed once heap-based timers are integrated with RocksDB incremental snapshots
+		if (keyedStateBackend instanceof AbstractKeyedStateBackend &&
+			((AbstractKeyedStateBackend<?>) keyedStateBackend).requiresLegacySynchronousTimerSnapshots()) {
+
 			KeyedStateCheckpointOutputStream out;
 
 			try {

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapPriorityQueueStateSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapPriorityQueueStateSnapshot.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapPriorityQueueStateSnapshot.java
deleted file mode 100644
index cc7fbc4..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapPriorityQueueStateSnapshot.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.state.KeyExtractorFunction;
-import org.apache.flink.runtime.state.KeyGroupPartitioner;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.StateSnapshot;
-import org.apache.flink.runtime.state.heap.HeapPriorityQueueSet;
-
-import javax.annotation.Nonnegative;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-
-import java.lang.reflect.Array;
-
-/**
- * This class represents the snapshot of an {@link HeapPriorityQueueSet}.
- *
- * @param <T> type of the state elements.
- */
-public class HeapPriorityQueueStateSnapshot<T> implements StateSnapshot {
-
-	/** Function that extracts keys from elements. */
-	@Nonnull
-	private final KeyExtractorFunction<T> keyExtractor;
-
-	/** Copy of the heap array containing all the (immutable or deeply copied) elements. */
-	@Nonnull
-	private final T[] heapArrayCopy;
-
-	/** The element serializer. */
-	@Nonnull
-	private final TypeSerializer<T> elementSerializer;
-
-	/** The key-group range covered by this snapshot. */
-	@Nonnull
-	private final KeyGroupRange keyGroupRange;
-
-	/** The total number of key-groups in the job. */
-	@Nonnegative
-	private final int totalKeyGroups;
-
-	/** Result of partitioning the snapshot by key-group. */
-	@Nullable
-	private KeyGroupPartitionedSnapshot partitionedSnapshot;
-
-	HeapPriorityQueueStateSnapshot(
-		@Nonnull T[] heapArrayCopy,
-		@Nonnull KeyExtractorFunction<T> keyExtractor,
-		@Nonnull TypeSerializer<T> elementSerializer,
-		@Nonnull KeyGroupRange keyGroupRange,
-		@Nonnegative int totalKeyGroups) {
-
-		// TODO ensure that the array contains a deep copy of elements if we are *not* dealing with immutable types.
-		assert elementSerializer.isImmutableType();
-
-		this.keyExtractor = keyExtractor;
-		this.heapArrayCopy = heapArrayCopy;
-		this.elementSerializer = elementSerializer;
-		this.keyGroupRange = keyGroupRange;
-		this.totalKeyGroups = totalKeyGroups;
-	}
-
-	@SuppressWarnings("unchecked")
-	@Nonnull
-	@Override
-	public KeyGroupPartitionedSnapshot partitionByKeyGroup() {
-
-		if (partitionedSnapshot == null) {
-
-			T[] partitioningOutput = (T[]) Array.newInstance(
-				heapArrayCopy.getClass().getComponentType(),
-				heapArrayCopy.length);
-
-			KeyGroupPartitioner<T> keyGroupPartitioner =
-				new KeyGroupPartitioner<>(
-					heapArrayCopy,
-					heapArrayCopy.length,
-					partitioningOutput,
-					keyGroupRange,
-					totalKeyGroups,
-					keyExtractor,
-					elementSerializer::serialize);
-
-			partitionedSnapshot = keyGroupPartitioner.partitionByKeyGroup();
-		}
-
-		return partitionedSnapshot;
-	}
-
-	@Override
-	public void release() {
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
index ad1617e..29b89c2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
@@ -46,6 +46,11 @@ import java.util.Map;
 @Internal
 public class InternalTimeServiceManager<K> {
 
+	//TODO guard these constants with a test
+	private static final String TIMER_STATE_PREFIX = "_timer_state";
+	private static final String PROCESSING_TIMER_PREFIX = TIMER_STATE_PREFIX + "/processing_";
+	private static final String EVENT_TIMER_PREFIX = TIMER_STATE_PREFIX + "/event_";
+
 	private final int totalKeyGroups;
 	private final KeyGroupRange localKeyGroupRange;
 	private final KeyContext keyContext;
@@ -55,12 +60,14 @@ public class InternalTimeServiceManager<K> {
 
 	private final Map<String, HeapInternalTimerService<K, ?>> timerServices;
 
+	private final boolean useLegacySynchronousSnapshots;
+
 	InternalTimeServiceManager(
-			int totalKeyGroups,
-			KeyGroupRange localKeyGroupRange,
-			KeyContext keyContext,
-			PriorityQueueSetFactory priorityQueueSetFactory,
-			ProcessingTimeService processingTimeService) {
+		int totalKeyGroups,
+		KeyGroupRange localKeyGroupRange,
+		KeyContext keyContext,
+		PriorityQueueSetFactory priorityQueueSetFactory,
+		ProcessingTimeService processingTimeService, boolean useLegacySynchronousSnapshots) {
 
 		Preconditions.checkArgument(totalKeyGroups > 0);
 		this.totalKeyGroups = totalKeyGroups;
@@ -68,6 +75,7 @@ public class InternalTimeServiceManager<K> {
 		this.priorityQueueSetFactory = Preconditions.checkNotNull(priorityQueueSetFactory);
 		this.keyContext = Preconditions.checkNotNull(keyContext);
 		this.processingTimeService = Preconditions.checkNotNull(processingTimeService);
+		this.useLegacySynchronousSnapshots = useLegacySynchronousSnapshots;
 
 		this.timerServices = new HashMap<>();
 	}
@@ -97,8 +105,8 @@ public class InternalTimeServiceManager<K> {
 				localKeyGroupRange,
 				keyContext,
 				processingTimeService,
-				createTimerPriorityQueue("__ts_" + name + "/processing_timers", timerSerializer),
-				createTimerPriorityQueue("__ts_" + name + "/event_timers", timerSerializer));
+				createTimerPriorityQueue(PROCESSING_TIMER_PREFIX + name, timerSerializer),
+				createTimerPriorityQueue(EVENT_TIMER_PREFIX + name, timerSerializer));
 
 			timerServices.put(name, timerService);
 		}
@@ -114,9 +122,7 @@ public class InternalTimeServiceManager<K> {
 		TimerSerializer<K, N> timerSerializer) {
 		return priorityQueueSetFactory.create(
 			name,
-			timerSerializer,
-			InternalTimer.getTimerComparator(),
-			InternalTimer.getKeyExtractorFunction());
+			timerSerializer);
 	}
 
 	public void advanceWatermark(Watermark watermark) throws Exception {
@@ -128,6 +134,7 @@ public class InternalTimeServiceManager<K> {
 	//////////////////				Fault Tolerance Methods				///////////////////
 
 	public void snapshotStateForKeyGroup(DataOutputView stream, int keyGroupIdx) throws IOException {
+		Preconditions.checkState(useLegacySynchronousSnapshots);
 		InternalTimerServiceSerializationProxy<K> serializationProxy =
 			new InternalTimerServiceSerializationProxy<>(this, keyGroupIdx);
 
@@ -148,6 +155,10 @@ public class InternalTimeServiceManager<K> {
 		serializationProxy.read(stream);
 	}
 
+	public boolean isUseLegacySynchronousSnapshots() {
+		return useLegacySynchronousSnapshots;
+	}
+
 	////////////////////			Methods used ONLY IN TESTS				////////////////////
 
 	@VisibleForTesting

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java
index f88b4fb..324f3fc 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java
@@ -20,6 +20,8 @@ package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.state.KeyExtractorFunction;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.PriorityComparable;
 import org.apache.flink.runtime.state.PriorityComparator;
 
 import javax.annotation.Nonnull;
@@ -31,7 +33,7 @@ import javax.annotation.Nonnull;
  * @param <N> Type of the namespace to which timers are scoped.
  */
 @Internal
-public interface InternalTimer<K, N> {
+public interface InternalTimer<K, N> extends PriorityComparable<InternalTimer<?, ?>>, Keyed<K> {
 
 	/** Function to extract the key from a {@link InternalTimer}. */
 	KeyExtractorFunction<InternalTimer<?, ?>> KEY_EXTRACTOR_FUNCTION = InternalTimer::getKey;
@@ -48,6 +50,7 @@ public interface InternalTimer<K, N> {
 	 * Returns the key that is bound to this timer.
 	 */
 	@Nonnull
+	@Override
 	K getKey();
 
 	/**
@@ -55,14 +58,4 @@ public interface InternalTimer<K, N> {
 	 */
 	@Nonnull
 	N getNamespace();
-
-	@SuppressWarnings("unchecked")
-	static <T extends InternalTimer> PriorityComparator<T> getTimerComparator() {
-		return (PriorityComparator<T>) TIMER_COMPARATOR;
-	}
-
-	@SuppressWarnings("unchecked")
-	static <T extends InternalTimer> KeyExtractorFunction<T> getKeyExtractorFunction() {
-		return (KeyExtractorFunction<T>) KEY_EXTRACTOR_FUNCTION;
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java
index dbb74e5..4f762e2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java
@@ -19,9 +19,11 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
 import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
@@ -29,6 +31,8 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.InstantiationUtil;
 
+import javax.annotation.Nonnull;
+
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashSet;
@@ -96,7 +100,7 @@ public class InternalTimersSnapshotReaderWriters {
 		public final void writeTimersSnapshot(DataOutputView out) throws IOException {
 			writeKeyAndNamespaceSerializers(out);
 
-			TimerHeapInternalTimer.TimerSerializer<K, N> timerSerializer = new TimerHeapInternalTimer.TimerSerializer<>(
+			LegacyTimerSerializer<K, N> timerSerializer = new LegacyTimerSerializer<>(
 				timersSnapshot.getKeySerializer(),
 				timersSnapshot.getNamespaceSerializer());
 
@@ -215,8 +219,8 @@ public class InternalTimersSnapshotReaderWriters {
 
 			restoreKeyAndNamespaceSerializers(restoredTimersSnapshot, in);
 
-			TimerHeapInternalTimer.TimerSerializer<K, N> timerSerializer =
-				new TimerHeapInternalTimer.TimerSerializer<>(
+			LegacyTimerSerializer<K, N> timerSerializer =
+				new LegacyTimerSerializer<>(
 					restoredTimersSnapshot.getKeySerializer(),
 					restoredTimersSnapshot.getNamespaceSerializer());
 
@@ -289,4 +293,120 @@ public class InternalTimersSnapshotReaderWriters {
 			restoredTimersSnapshot.setNamespaceSerializerConfigSnapshot(serializersAndConfigs.get(1).f1);
 		}
 	}
+
+	/**
+	 * A {@link TypeSerializer} used to serialize/deserialize a {@link TimerHeapInternalTimer}.
+	 */
+	public static class LegacyTimerSerializer<K, N> extends TypeSerializer<TimerHeapInternalTimer<K, N>> {
+
+		private static final long serialVersionUID = 1119562170939152304L;
+
+		@Nonnull
+		private final TypeSerializer<K> keySerializer;
+
+		@Nonnull
+		private final TypeSerializer<N> namespaceSerializer;
+
+		LegacyTimerSerializer(@Nonnull TypeSerializer<K> keySerializer, @Nonnull TypeSerializer<N> namespaceSerializer) {
+			this.keySerializer = keySerializer;
+			this.namespaceSerializer = namespaceSerializer;
+		}
+
+		@Override
+		public boolean isImmutableType() {
+			return false;
+		}
+
+		@Override
+		public TypeSerializer<TimerHeapInternalTimer<K, N>> duplicate() {
+
+			final TypeSerializer<K> keySerializerDuplicate = keySerializer.duplicate();
+			final TypeSerializer<N> namespaceSerializerDuplicate = namespaceSerializer.duplicate();
+
+			if (keySerializerDuplicate == keySerializer &&
+				namespaceSerializerDuplicate == namespaceSerializer) {
+				// all delegate serializers seem stateless, so this is also stateless.
+				return this;
+			} else {
+				// at least one delegate serializer seems to be stateful, so we return a new instance.
+				return new LegacyTimerSerializer<>(keySerializerDuplicate, namespaceSerializerDuplicate);
+			}
+		}
+
+		@Override
+		public TimerHeapInternalTimer<K, N> createInstance() {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public TimerHeapInternalTimer<K, N> copy(TimerHeapInternalTimer<K, N> from) {
+			return new TimerHeapInternalTimer<>(from.getTimestamp(), from.getKey(), from.getNamespace());
+		}
+
+		@Override
+		public TimerHeapInternalTimer<K, N> copy(TimerHeapInternalTimer<K, N> from, TimerHeapInternalTimer<K, N> reuse) {
+			return copy(from);
+		}
+
+		@Override
+		public int getLength() {
+			// we do not have fixed length
+			return -1;
+		}
+
+		@Override
+		public void serialize(TimerHeapInternalTimer<K, N> record, DataOutputView target) throws IOException {
+			keySerializer.serialize(record.getKey(), target);
+			namespaceSerializer.serialize(record.getNamespace(), target);
+			LongSerializer.INSTANCE.serialize(record.getTimestamp(), target);
+		}
+
+		@Override
+		public TimerHeapInternalTimer<K, N> deserialize(DataInputView source) throws IOException {
+			K key = keySerializer.deserialize(source);
+			N namespace = namespaceSerializer.deserialize(source);
+			Long timestamp = LongSerializer.INSTANCE.deserialize(source);
+			return new TimerHeapInternalTimer<>(timestamp, key, namespace);
+		}
+
+		@Override
+		public TimerHeapInternalTimer<K, N> deserialize(TimerHeapInternalTimer<K, N> reuse, DataInputView source) throws IOException {
+			return deserialize(source);
+		}
+
+		@Override
+		public void copy(DataInputView source, DataOutputView target) throws IOException {
+			keySerializer.copy(source, target);
+			namespaceSerializer.copy(source, target);
+			LongSerializer.INSTANCE.copy(source, target);
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			return obj == this ||
+				(obj != null && obj.getClass() == getClass() &&
+					keySerializer.equals(((LegacyTimerSerializer) obj).keySerializer) &&
+					namespaceSerializer.equals(((LegacyTimerSerializer) obj).namespaceSerializer));
+		}
+
+		@Override
+		public boolean canEqual(Object obj) {
+			return true;
+		}
+
+		@Override
+		public int hashCode() {
+			return getClass().hashCode();
+		}
+
+		@Override
+		public TypeSerializerConfigSnapshot snapshotConfiguration() {
+			throw new UnsupportedOperationException("This serializer is not registered for managed state.");
+		}
+
+		@Override
+		public CompatibilityResult<TimerHeapInternalTimer<K, N>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+			throw new UnsupportedOperationException("This serializer is not registered for managed state.");
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
index ca9cb0b..a6bee4c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
@@ -209,7 +209,8 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize
 			keyGroupRange,
 			keyContext,
 			keyedStatedBackend,
-			processingTimeService);
+			processingTimeService,
+			keyedStatedBackend.requiresLegacySynchronousTimerSnapshots());
 
 		// and then initialize the timer services
 		for (KeyGroupStatePartitionStreamProvider streamProvider : rawKeyedStates) {

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java
index b9ef88e..a6194ed 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java
@@ -19,19 +19,11 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
 import org.apache.flink.runtime.state.heap.HeapPriorityQueueSet;
 
 import javax.annotation.Nonnull;
 
-import java.io.IOException;
-
 /**
  * Implementation of {@link InternalTimer} to use with a {@link HeapPriorityQueueSet}.
  *
@@ -133,119 +125,8 @@ public final class TimerHeapInternalTimer<K, N> implements InternalTimer<K, N>,
 				'}';
 	}
 
-	/**
-	 * A {@link TypeSerializer} used to serialize/deserialize a {@link TimerHeapInternalTimer}.
-	 */
-	public static class TimerSerializer<K, N> extends TypeSerializer<TimerHeapInternalTimer<K, N>> {
-
-		private static final long serialVersionUID = 1119562170939152304L;
-
-		@Nonnull
-		private final TypeSerializer<K> keySerializer;
-
-		@Nonnull
-		private final TypeSerializer<N> namespaceSerializer;
-
-		TimerSerializer(@Nonnull TypeSerializer<K> keySerializer, @Nonnull TypeSerializer<N> namespaceSerializer) {
-			this.keySerializer = keySerializer;
-			this.namespaceSerializer = namespaceSerializer;
-		}
-
-		@Override
-		public boolean isImmutableType() {
-			return false;
-		}
-
-		@Override
-		public TypeSerializer<TimerHeapInternalTimer<K, N>> duplicate() {
-
-			final TypeSerializer<K> keySerializerDuplicate = keySerializer.duplicate();
-			final TypeSerializer<N> namespaceSerializerDuplicate = namespaceSerializer.duplicate();
-
-			if (keySerializerDuplicate == keySerializer &&
-				namespaceSerializerDuplicate == namespaceSerializer) {
-				// all delegate serializers seem stateless, so this is also stateless.
-				return this;
-			} else {
-				// at least one delegate serializer seems to be stateful, so we return a new instance.
-				return new TimerSerializer<>(keySerializerDuplicate, namespaceSerializerDuplicate);
-			}
-		}
-
-		@Override
-		public TimerHeapInternalTimer<K, N> createInstance() {
-			throw new UnsupportedOperationException();
-		}
-
-		@Override
-		public TimerHeapInternalTimer<K, N> copy(TimerHeapInternalTimer<K, N> from) {
-			return new TimerHeapInternalTimer<>(from.getTimestamp(), from.getKey(), from.getNamespace());
-		}
-
-		@Override
-		public TimerHeapInternalTimer<K, N> copy(TimerHeapInternalTimer<K, N> from, TimerHeapInternalTimer<K, N> reuse) {
-			return copy(from);
-		}
-
-		@Override
-		public int getLength() {
-			// we do not have fixed length
-			return -1;
-		}
-
-		@Override
-		public void serialize(TimerHeapInternalTimer<K, N> record, DataOutputView target) throws IOException {
-			keySerializer.serialize(record.getKey(), target);
-			namespaceSerializer.serialize(record.getNamespace(), target);
-			LongSerializer.INSTANCE.serialize(record.getTimestamp(), target);
-		}
-
-		@Override
-		public TimerHeapInternalTimer<K, N> deserialize(DataInputView source) throws IOException {
-			K key = keySerializer.deserialize(source);
-			N namespace = namespaceSerializer.deserialize(source);
-			Long timestamp = LongSerializer.INSTANCE.deserialize(source);
-			return new TimerHeapInternalTimer<>(timestamp, key, namespace);
-		}
-
-		@Override
-		public TimerHeapInternalTimer<K, N> deserialize(TimerHeapInternalTimer<K, N> reuse, DataInputView source) throws IOException {
-			return deserialize(source);
-		}
-
-		@Override
-		public void copy(DataInputView source, DataOutputView target) throws IOException {
-			keySerializer.copy(source, target);
-			namespaceSerializer.copy(source, target);
-			LongSerializer.INSTANCE.copy(source, target);
-		}
-
-		@Override
-		public boolean equals(Object obj) {
-			return obj == this ||
-				(obj != null && obj.getClass() == getClass() &&
-					keySerializer.equals(((TimerSerializer) obj).keySerializer) &&
-					namespaceSerializer.equals(((TimerSerializer) obj).namespaceSerializer));
-		}
-
-		@Override
-		public boolean canEqual(Object obj) {
-			return true;
-		}
-
-		@Override
-		public int hashCode() {
-			return getClass().hashCode();
-		}
-
-		@Override
-		public TypeSerializerConfigSnapshot snapshotConfiguration() {
-			throw new UnsupportedOperationException("This serializer is not registered for managed state.");
-		}
-
-		@Override
-		public CompatibilityResult<TimerHeapInternalTimer<K, N>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
-			throw new UnsupportedOperationException("This serializer is not registered for managed state.");
-		}
+	@Override
+	public int comparePriorityTo(@Nonnull InternalTimer<?, ?> other) {
+		return Long.compare(timestamp, other.getTimestamp());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerSerializer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerSerializer.java
index 87a3159..73f42ef 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerSerializer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerSerializer.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.core.memory.DataInputView;
@@ -31,8 +32,8 @@ import java.io.IOException;
 import java.util.Objects;
 
 /**
- * A serializer for {@link TimerHeapInternalTimer} objects that produces a serialization format that is aligned with
- * {@link InternalTimer#getTimerComparator()}.
+ * A serializer for {@link TimerHeapInternalTimer} objects that produces a serialization format that is
+ * lexicographically aligned the priority of the timers.
  *
  * @param <K> type of the timer key.
  * @param <N> type of the timer namespace.
@@ -201,13 +202,14 @@ public class TimerSerializer<K, N> extends TypeSerializer<TimerHeapInternalTimer
 
 	@Override
 	public TypeSerializerConfigSnapshot snapshotConfiguration() {
-		throw new UnsupportedOperationException("This serializer is currently not used to write state.");
+		return new TimerSerializerConfigSnapshot<>(keySerializer, namespaceSerializer);
 	}
 
 	@Override
 	public CompatibilityResult<TimerHeapInternalTimer<K, N>> ensureCompatibility(
 		TypeSerializerConfigSnapshot configSnapshot) {
-		throw new UnsupportedOperationException("This serializer is currently not used to write state.");
+		//TODO this is just a mock (assuming no serializer updates) for now and needs a proper implementation! change this before release.
+		return CompatibilityResult.compatible();
 	}
 
 	@Nonnull
@@ -219,4 +221,25 @@ public class TimerSerializer<K, N> extends TypeSerializer<TimerHeapInternalTimer
 	public TypeSerializer<N> getNamespaceSerializer() {
 		return namespaceSerializer;
 	}
+
+	/**
+	 * Snaphot of a {@link TimerSerializer}.
+	 *
+	 * @param <K> type of key.
+	 * @param <N> type of namespace.
+	 */
+	public static class TimerSerializerConfigSnapshot<K, N> extends CompositeTypeSerializerConfigSnapshot {
+
+		public TimerSerializerConfigSnapshot() {
+		}
+
+		public TimerSerializerConfigSnapshot(TypeSerializer<K> keySerializer, TypeSerializer<N> namespaceSerializer) {
+			super(keySerializer, namespaceSerializer);
+		}
+
+		@Override
+		public int getVersion() {
+			return 0;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
index 519f10e..957a535 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
@@ -910,8 +910,6 @@ public class HeapInternalTimerServiceTest {
 		PriorityQueueSetFactory priorityQueueSetFactory) {
 		return priorityQueueSetFactory.create(
 			name,
-			timerSerializer,
-			InternalTimer.getTimerComparator(),
-			InternalTimer.getKeyExtractorFunction());
+			timerSerializer);
 	}
 }