You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/07/12 18:19:48 UTC

[1/3] flink git commit: [FLINK-9701] Introduce TTL configuration in state descriptors

Repository: flink
Updated Branches:
  refs/heads/master b407ba792 -> f45b7f7ff


http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTtlStateTest.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTtlStateTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTtlStateTest.java
new file mode 100644
index 0000000..a590828
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTtlStateTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.ttl.StateBackendTestContext;
+import org.apache.flink.runtime.state.ttl.TtlStateTestBase;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.TernaryBoolean;
+
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+
+/** Test suite for rocksdb state TTL. */
+public class RocksDBTtlStateTest extends TtlStateTestBase {
+	@Rule
+	public final TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Override
+	protected StateBackendTestContext createStateBackendTestContext(TtlTimeProvider timeProvider) {
+		return new StateBackendTestContext(timeProvider) {
+			@Override
+			protected StateBackend createStateBackend() {
+				return RocksDBTtlStateTest.this.createStateBackend();
+			}
+		};
+	}
+
+	private StateBackend createStateBackend() {
+		String dbPath;
+		String checkpointPath;
+		try {
+			dbPath = tempFolder.newFolder().getAbsolutePath();
+			checkpointPath = tempFolder.newFolder().toURI().toString();
+		} catch (IOException e) {
+			throw new FlinkRuntimeException("Failed to init rocksdb test state backend");
+		}
+		RocksDBStateBackend backend = new RocksDBStateBackend(new FsStateBackend(checkpointPath), TernaryBoolean.FALSE);
+		backend.setDbStoragePath(dbPath);
+		return backend;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
index 594f337..ca9cb0b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.StatePartitionStreamProvider;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.TaskStateManager;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.runtime.util.OperatorSubtaskDescriptionText;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.util.CloseableIterable;
@@ -270,7 +271,8 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize
 					keySerializer,
 					taskInfo.getMaxNumberOfParallelSubtasks(),
 					keyGroupRange,
-					environment.getTaskKvStateRegistry()),
+					environment.getTaskKvStateRegistry(),
+					TtlTimeProvider.DEFAULT),
 				backendCloseableRegistry,
 				logDescription);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java
index 000cf31..6233c4c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java
@@ -46,6 +46,7 @@ import org.apache.flink.runtime.state.TaskStateManager;
 import org.apache.flink.runtime.state.TaskStateManagerImplTest;
 import org.apache.flink.runtime.state.TestTaskLocalStateStore;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.runtime.taskmanager.TestCheckpointResponder;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
@@ -138,7 +139,8 @@ public class StreamTaskStateInitializerImplTest {
 				String operatorIdentifier,
 				TypeSerializer<K> keySerializer,
 				int numberOfKeyGroups, KeyGroupRange keyGroupRange,
-				TaskKvStateRegistry kvStateRegistry) throws Exception {
+				TaskKvStateRegistry kvStateRegistry,
+				TtlTimeProvider ttlTimeProvider) throws Exception {
 				return mock(AbstractKeyedStateBackend.class);
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
index e5558f6..aebad54 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
@@ -62,6 +62,7 @@ import org.apache.flink.runtime.state.SnapshotResult;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.runtime.state.memory.MemoryBackendCheckpointStorage;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskManagerActions;
@@ -261,7 +262,8 @@ public class StreamTaskTerminationTest extends TestLogger {
 			TypeSerializer<K> keySerializer,
 			int numberOfKeyGroups,
 			KeyGroupRange keyGroupRange,
-			TaskKvStateRegistry kvStateRegistry) {
+			TaskKvStateRegistry kvStateRegistry,
+			TtlTimeProvider ttlTimeProvider) {
 			return null;
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSpyWrapperStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSpyWrapperStateBackend.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSpyWrapperStateBackend.java
index 914326b..214fab5 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSpyWrapperStateBackend.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSpyWrapperStateBackend.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.state.CheckpointStorage;
 import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
@@ -53,7 +54,8 @@ public class TestSpyWrapperStateBackend extends AbstractStateBackend {
 			TypeSerializer<K> keySerializer,
 			int numberOfKeyGroups,
 			KeyGroupRange keyGroupRange,
-			TaskKvStateRegistry kvStateRegistry) throws IOException {
+			TaskKvStateRegistry kvStateRegistry,
+			TtlTimeProvider ttlTimeProvider) throws IOException {
 			return spy(delegate.createKeyedStateBackend(
 				env,
 				jobID,
@@ -61,7 +63,8 @@ public class TestSpyWrapperStateBackend extends AbstractStateBackend {
 				keySerializer,
 				numberOfKeyGroups,
 				keyGroupRange,
-				kvStateRegistry));
+				kvStateRegistry,
+				ttlTimeProvider));
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
index 46c408e..f3bae16 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.memory.MemoryBackendCheckpointStorage;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.util.ExceptionUtils;
@@ -105,13 +106,14 @@ public class StateBackendITCase extends AbstractTestBase {
 
 		@Override
 		public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
-				Environment env,
-				JobID jobID,
-				String operatorIdentifier,
-				TypeSerializer<K> keySerializer,
-				int numberOfKeyGroups,
-				KeyGroupRange keyGroupRange,
-				TaskKvStateRegistry kvStateRegistry) throws IOException {
+			Environment env,
+			JobID jobID,
+			String operatorIdentifier,
+			TypeSerializer<K> keySerializer,
+			int numberOfKeyGroups,
+			KeyGroupRange keyGroupRange,
+			TaskKvStateRegistry kvStateRegistry,
+			TtlTimeProvider ttlTimeProvider) throws IOException {
 			throw new SuccessException();
 		}
 


[2/3] flink git commit: [FLINK-9701] Introduce TTL configuration in state descriptors

Posted by sr...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlListStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlListStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlListStateTest.java
deleted file mode 100644
index 893f9ae..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlListStateTest.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.ttl;
-
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-import java.util.stream.StreamSupport;
-
-/** Test suite for {@link TtlListState}. */
-public class TtlListStateTest
-	extends TtlMergingStateBase<TtlListState<?, String, Integer>, List<Integer>, Iterable<Integer>> {
-	@Override
-	void initTestValues() {
-		updater = v -> ttlState.addAll(v);
-		getter = () -> StreamSupport.stream(ttlState.get().spliterator(), false).collect(Collectors.toList());
-		originalGetter = () -> ttlState.original.get();
-
-		emptyValue = Collections.emptyList();
-
-		updateEmpty = Arrays.asList(5, 7, 10);
-		updateUnexpired = Arrays.asList(8, 9, 11);
-		updateExpired = Arrays.asList(1, 4);
-
-		getUpdateEmpty = updateEmpty;
-		getUnexpired = updateUnexpired;
-		getUpdateExpired = updateExpired;
-	}
-
-	@Override
-	TtlListState<?, String, Integer> createState() {
-		ListStateDescriptor<Integer> listStateDesc =
-			new ListStateDescriptor<>("TtlTestListState", IntSerializer.INSTANCE);
-		return (TtlListState<?, String, Integer>) wrapMockState(listStateDesc);
-	}
-
-	@Override
-	List<Integer> generateRandomUpdate() {
-		int size = RANDOM.nextInt(5);
-		return IntStream.range(0, size).mapToObj(i -> RANDOM.nextInt(100)).collect(Collectors.toList());
-	}
-
-	@Override
-	Iterable<Integer> getMergeResult(
-		List<Tuple2<String, List<Integer>>> unexpiredUpdatesToMerge,
-		List<Tuple2<String, List<Integer>>> finalUpdatesToMerge) {
-		List<Integer> result = new ArrayList<>();
-		finalUpdatesToMerge.forEach(t -> result.addAll(t.f1));
-		return result;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlListStateTestContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlListStateTestContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlListStateTestContext.java
new file mode 100644
index 0000000..c113bf1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlListStateTestContext.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.StreamSupport;
+
+/** Test suite for {@link TtlListState}. */
+class TtlListStateTestContext
+	extends TtlMergingStateTestContext<TtlListState<?, String, Integer>, List<Integer>, Iterable<Integer>> {
+	@Override
+	void initTestValues() {
+		emptyValue = Collections.emptyList();
+
+		updateEmpty = Arrays.asList(5, 7, 10);
+		updateUnexpired = Arrays.asList(8, 9, 11);
+		updateExpired = Arrays.asList(1, 4);
+
+		getUpdateEmpty = updateEmpty;
+		getUnexpired = updateUnexpired;
+		getUpdateExpired = updateExpired;
+	}
+
+	@Override
+	void update(List<Integer> value) throws Exception {
+		ttlState.addAll(value);
+	}
+
+	@Override
+	Iterable<Integer> get() throws Exception {
+		return StreamSupport.stream(ttlState.get().spliterator(), false).collect(Collectors.toList());
+	}
+
+	@Override
+	Object getOriginal() throws Exception {
+		return ttlState.original.get() == null ? emptyValue : ttlState.original.get();
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	<US extends State, SV> StateDescriptor<US, SV> createStateDescriptor() {
+		return (StateDescriptor<US, SV>) new ListStateDescriptor<>("TtlTestListState", IntSerializer.INSTANCE);
+	}
+
+	@Override
+	List<Integer> generateRandomUpdate() {
+		int size = RANDOM.nextInt(5);
+		return IntStream.range(0, size).mapToObj(i -> RANDOM.nextInt(100)).collect(Collectors.toList());
+	}
+
+	@Override
+	Iterable<Integer> getMergeResult(
+		List<Tuple2<String, List<Integer>>> unexpiredUpdatesToMerge,
+		List<Tuple2<String, List<Integer>>> finalUpdatesToMerge) {
+		List<Integer> result = new ArrayList<>();
+		finalUpdatesToMerge.forEach(t -> result.addAll(t.f1));
+		return result;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateAllEntriesTestContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateAllEntriesTestContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateAllEntriesTestContext.java
new file mode 100644
index 0000000..7fd61aa
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateAllEntriesTestContext.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/** Test suite for collection methods of {@link TtlMapState}. */
+class TtlMapStateAllEntriesTestContext extends
+	TtlMapStateTestContext<Map<Integer, String>, Set<Map.Entry<Integer, String>>> {
+
+	@Override
+	void initTestValues() {
+		emptyValue = Collections.emptySet();
+
+		updateEmpty = mapOf(Tuple2.of(3, "3"), Tuple2.of(5, "5"), Tuple2.of(10, "10"));
+		updateUnexpired = mapOf(Tuple2.of(12, "12"), Tuple2.of(7, "7"));
+		updateExpired = mapOf(Tuple2.of(15, "15"), Tuple2.of(4, "4"));
+
+		getUpdateEmpty = updateEmpty.entrySet();
+		getUnexpired = updateUnexpired.entrySet();
+		getUpdateExpired = updateExpired.entrySet();
+	}
+
+	@SafeVarargs
+	private static <UK, UV> Map<UK, UV> mapOf(Tuple2<UK, UV> ... entries) {
+		return Arrays.stream(entries).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+	}
+
+	@Override
+	void update(Map<Integer, String> map) throws Exception {
+		ttlState.putAll(map);
+	}
+
+	@Override
+	Set<Map.Entry<Integer, String>> get() throws Exception {
+		return StreamSupport.stream(ttlState.entries().spliterator(), false).collect(Collectors.toSet());
+	}
+
+	@Override
+	Object getOriginal() throws Exception {
+		return ttlState.original.entries() == null ? Collections.emptySet() : ttlState.original.entries();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTest.java
deleted file mode 100644
index d6949e7..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTest.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.ttl;
-
-/** Test suite for per element methods of {@link TtlMapState}. */
-public class TtlMapStatePerElementTest extends TtlMapStateTestBase<String, String> {
-	private static final int TEST_KEY = 1;
-	private static final String TEST_VAL1 = "test value1";
-	private static final String TEST_VAL2 = "test value2";
-	private static final String TEST_VAL3 = "test value3";
-
-	@Override
-	void initTestValues() {
-		updater = v -> ttlState.put(TEST_KEY, v);
-		getter = () -> ttlState.get(TEST_KEY);
-		originalGetter = () -> ttlState.original.get(TEST_KEY);
-
-		updateEmpty = TEST_VAL1;
-		updateUnexpired = TEST_VAL2;
-		updateExpired = TEST_VAL3;
-
-		getUpdateEmpty = TEST_VAL1;
-		getUnexpired = TEST_VAL2;
-		getUpdateExpired = TEST_VAL3;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTestContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTestContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTestContext.java
new file mode 100644
index 0000000..fb025af
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTestContext.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+/** Test suite for per element methods of {@link TtlMapState}. */
+class TtlMapStatePerElementTestContext extends TtlMapStateTestContext<String, String> {
+	private static final int TEST_KEY = 1;
+	private static final String TEST_VAL1 = "test value1";
+	private static final String TEST_VAL2 = "test value2";
+	private static final String TEST_VAL3 = "test value3";
+
+	@Override
+	void initTestValues() {
+		updateEmpty = TEST_VAL1;
+		updateUnexpired = TEST_VAL2;
+		updateExpired = TEST_VAL3;
+
+		getUpdateEmpty = TEST_VAL1;
+		getUnexpired = TEST_VAL2;
+		getUpdateExpired = TEST_VAL3;
+	}
+
+	@Override
+	void update(String value) throws Exception {
+		ttlState.put(TEST_KEY, value);
+	}
+
+	@Override
+	String get() throws Exception {
+		return ttlState.get(TEST_KEY);
+	}
+
+	@Override
+	Object getOriginal() throws Exception {
+		return ttlState.original.get(TEST_KEY);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTest.java
deleted file mode 100644
index bac2f41..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.ttl;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
-
-/** Test suite for collection methods of {@link TtlMapState}. */
-public class TtlMapStateTest extends
-	TtlMapStateTestBase<Map<Integer, String>, Set<Map.Entry<Integer, String>>> {
-
-	@Override
-	void initTestValues() {
-		updater = map -> ttlState.putAll(map);
-		getter = () -> StreamSupport.stream(ttlState.entries().spliterator(), false).collect(Collectors.toSet());
-		originalGetter = () -> ttlState.original.entries();
-
-		emptyValue = Collections.emptySet();
-
-		updateEmpty = mapOf(Tuple2.of(3, "3"), Tuple2.of(5, "5"), Tuple2.of(10, "10"));
-		updateUnexpired = mapOf(Tuple2.of(12, "12"), Tuple2.of(7, "7"));
-		updateExpired = mapOf(Tuple2.of(15, "15"), Tuple2.of(4, "4"));
-
-		getUpdateEmpty = updateEmpty.entrySet();
-		getUnexpired = updateUnexpired.entrySet();
-		getUpdateExpired = updateExpired.entrySet();
-	}
-
-	@SafeVarargs
-	private static <UK, UV> Map<UK, UV> mapOf(Tuple2<UK, UV> ... entries) {
-		return Arrays.stream(entries).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTestBase.java
deleted file mode 100644
index dab3194..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTestBase.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.ttl;
-
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-
-abstract class TtlMapStateTestBase<UV, GV>
-	extends TtlStateTestBase<TtlMapState<?, String, Integer, String>, UV, GV> {
-	@Override
-	TtlMapState<?, String, Integer, String> createState() {
-		MapStateDescriptor<Integer, String> mapStateDesc =
-			new MapStateDescriptor<>("TtlTestMapState", IntSerializer.INSTANCE, StringSerializer.INSTANCE);
-		return (TtlMapState<?, String, Integer, String>) wrapMockState(mapStateDesc);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTestContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTestContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTestContext.java
new file mode 100644
index 0000000..bb7b327
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTestContext.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+
+abstract class TtlMapStateTestContext<UV, GV>
+	extends TtlStateTestContextBase<TtlMapState<?, String, Integer, String>, UV, GV> {
+	@SuppressWarnings("unchecked")
+	@Override
+	<US extends State, SV> StateDescriptor<US, SV> createStateDescriptor() {
+		return (StateDescriptor<US, SV>) new MapStateDescriptor<>(
+			"TtlTestMapState", IntSerializer.INSTANCE, StringSerializer.INSTANCE);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMergingStateBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMergingStateBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMergingStateBase.java
deleted file mode 100644
index 6a7aebe..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMergingStateBase.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.ttl;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.state.internal.InternalMergingState;
-
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Random;
-
-import static org.junit.Assert.assertEquals;
-
-abstract class TtlMergingStateBase<S extends InternalMergingState<?, String, ?, ?, GV>, UV, GV>
-	extends TtlStateTestBase<S, UV, GV> {
-	static final Random RANDOM = new Random();
-
-	private static final List<String> NAMESPACES = Arrays.asList(
-		"unsetNamespace1",
-		"unsetNamespace2",
-		"expiredNamespace",
-		"expiredAndUpdatedNamespace",
-		"unexpiredNamespace",
-		"finalNamespace");
-
-	@Test
-	public void testMergeNamespaces() throws Exception {
-		initTest();
-
-		timeProvider.time = 0;
-		List<Tuple2<String, UV>> expiredUpdatesToMerge = generateExpiredUpdatesToMerge();
-		applyStateUpdates(expiredUpdatesToMerge);
-
-		timeProvider.time = 120;
-		List<Tuple2<String, UV>> unexpiredUpdatesToMerge = generateUnexpiredUpdatesToMerge();
-		applyStateUpdates(unexpiredUpdatesToMerge);
-
-		timeProvider.time = 150;
-		List<Tuple2<String, UV>> finalUpdatesToMerge = generateFinalUpdatesToMerge();
-		applyStateUpdates(finalUpdatesToMerge);
-
-		timeProvider.time = 230;
-		ttlState.mergeNamespaces("targetNamespace", NAMESPACES);
-		ttlState.setCurrentNamespace("targetNamespace");
-		assertEquals("Unexpected result of merge operation",
-			getMergeResult(unexpiredUpdatesToMerge, finalUpdatesToMerge), getter.get());
-	}
-
-	private List<Tuple2<String, UV>> generateExpiredUpdatesToMerge() {
-		return Arrays.asList(
-			Tuple2.of("expiredNamespace", generateRandomUpdate()),
-			Tuple2.of("expiredNamespace", generateRandomUpdate()),
-			Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate()),
-			Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate())
-		);
-	}
-
-	private List<Tuple2<String, UV>> generateUnexpiredUpdatesToMerge() {
-		return Arrays.asList(
-			Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate()),
-			Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate()),
-			Tuple2.of("unexpiredNamespace", generateRandomUpdate()),
-			Tuple2.of("unexpiredNamespace", generateRandomUpdate())
-		);
-	}
-
-	private List<Tuple2<String, UV>> generateFinalUpdatesToMerge() {
-		return Arrays.asList(
-			Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate()),
-			Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate()),
-			Tuple2.of("unexpiredNamespace", generateRandomUpdate()),
-			Tuple2.of("unexpiredNamespace", generateRandomUpdate()),
-			Tuple2.of("finalNamespace", generateRandomUpdate()),
-			Tuple2.of("finalNamespace", generateRandomUpdate())
-		);
-	}
-
-	abstract UV generateRandomUpdate();
-
-	private void applyStateUpdates(List<Tuple2<String, UV>> updates) throws Exception {
-		for (Tuple2<String, UV> t : updates) {
-			ttlState.setCurrentNamespace(t.f0);
-			updater.accept(t.f1);
-		}
-	}
-
-	abstract GV getMergeResult(
-		List<Tuple2<String, UV>> unexpiredUpdatesToMerge,
-		List<Tuple2<String, UV>> finalUpdatesToMerge);
-
-	@SuppressWarnings("unchecked")
-	abstract static class TtlIntegerMergingStateBase<
-		S extends InternalMergingState<?, String, ?, ?, GV>,
-		UV extends Number, GV>
-		extends TtlMergingStateBase<S, UV, GV> {
-		@Override
-		UV generateRandomUpdate() {
-			return (UV) (Integer) RANDOM.nextInt(1000);
-		}
-
-		int getIntegerMergeResult(
-			List<Tuple2<String, UV>> unexpiredUpdatesToMerge,
-			List<Tuple2<String, UV>> finalUpdatesToMerge) {
-			return unexpiredUpdatesToMerge.stream().mapToInt(t -> (Integer) t.f1).sum() +
-				finalUpdatesToMerge.stream().mapToInt(t -> (Integer) t.f1).sum();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMergingStateTestContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMergingStateTestContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMergingStateTestContext.java
new file mode 100644
index 0000000..85c6134
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMergingStateTestContext.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.internal.InternalMergingState;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+abstract class TtlMergingStateTestContext<S extends InternalMergingState<?, String, ?, ?, GV>, UV, GV>
+	extends TtlStateTestContextBase<S, UV, GV> {
+	static final Random RANDOM = new Random();
+
+	static final List<String> NAMESPACES = Arrays.asList(
+		"unsetNamespace1",
+		"unsetNamespace2",
+		"expiredNamespace",
+		"expiredAndUpdatedNamespace",
+		"unexpiredNamespace",
+		"finalNamespace");
+
+	List<Tuple2<String, UV>> generateExpiredUpdatesToMerge() {
+		return Arrays.asList(
+			Tuple2.of("expiredNamespace", generateRandomUpdate()),
+			Tuple2.of("expiredNamespace", generateRandomUpdate()),
+			Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate()),
+			Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate())
+		);
+	}
+
+	List<Tuple2<String, UV>> generateUnexpiredUpdatesToMerge() {
+		return Arrays.asList(
+			Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate()),
+			Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate()),
+			Tuple2.of("unexpiredNamespace", generateRandomUpdate()),
+			Tuple2.of("unexpiredNamespace", generateRandomUpdate())
+		);
+	}
+
+	List<Tuple2<String, UV>> generateFinalUpdatesToMerge() {
+		return Arrays.asList(
+			Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate()),
+			Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate()),
+			Tuple2.of("unexpiredNamespace", generateRandomUpdate()),
+			Tuple2.of("unexpiredNamespace", generateRandomUpdate()),
+			Tuple2.of("finalNamespace", generateRandomUpdate()),
+			Tuple2.of("finalNamespace", generateRandomUpdate())
+		);
+	}
+
+	abstract UV generateRandomUpdate();
+
+	void applyStateUpdates(List<Tuple2<String, UV>> updates) throws Exception {
+		for (Tuple2<String, UV> t : updates) {
+			ttlState.setCurrentNamespace(t.f0);
+			update(t.f1);
+		}
+	}
+
+	abstract GV getMergeResult(
+		List<Tuple2<String, UV>> unexpiredUpdatesToMerge,
+		List<Tuple2<String, UV>> finalUpdatesToMerge);
+
+	@SuppressWarnings("unchecked")
+	abstract static class TtlIntegerMergingStateTestContext<
+		S extends InternalMergingState<?, String, ?, ?, GV>,
+		UV extends Number, GV>
+		extends TtlMergingStateTestContext<S, UV, GV> {
+		@Override
+		UV generateRandomUpdate() {
+			return (UV) (Integer) RANDOM.nextInt(1000);
+		}
+
+		int getIntegerMergeResult(
+			List<Tuple2<String, UV>> unexpiredUpdatesToMerge,
+			List<Tuple2<String, UV>> finalUpdatesToMerge) {
+			return unexpiredUpdatesToMerge.stream().mapToInt(t -> (Integer) t.f1).sum() +
+				finalUpdatesToMerge.stream().mapToInt(t -> (Integer) t.f1).sum();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlReducingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlReducingStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlReducingStateTest.java
deleted file mode 100644
index bc5f67f..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlReducingStateTest.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.ttl;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.state.ReducingStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-
-import java.util.List;
-
-/** Test suite for {@link TtlReducingState}. */
-public class TtlReducingStateTest
-	extends TtlMergingStateBase.TtlIntegerMergingStateBase<TtlReducingState<?, String, Integer>, Integer, Integer> {
-	@Override
-	void initTestValues() {
-		updater = v -> ttlState.add(v);
-		getter = () -> ttlState.get();
-		originalGetter = () -> ttlState.original.get();
-
-		updateEmpty = 5;
-		updateUnexpired = 7;
-		updateExpired = 6;
-
-		getUpdateEmpty = 5;
-		getUnexpired = 12;
-		getUpdateExpired = 6;
-	}
-
-	@Override
-	TtlReducingState<?, String, Integer> createState() {
-		ReducingStateDescriptor<Integer> aggregatingStateDes =
-			new ReducingStateDescriptor<>("TtlTestReducingState", REDUCE, IntSerializer.INSTANCE);
-		return (TtlReducingState<?, String, Integer>) wrapMockState(aggregatingStateDes);
-	}
-
-	@Override
-	Integer getMergeResult(
-		List<Tuple2<String, Integer>> unexpiredUpdatesToMerge,
-		List<Tuple2<String, Integer>> finalUpdatesToMerge) {
-		return getIntegerMergeResult(unexpiredUpdatesToMerge, finalUpdatesToMerge);
-	}
-
-	private static final ReduceFunction<Integer> REDUCE = (v1, v2) -> {
-		if (v1 == null && v2 == null) {
-			return null;
-		} else if (v1 == null) {
-			return v2;
-		} else if (v2 == null) {
-			return v1;
-		} else {
-			return v1 + v2;
-		}
-	};
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlReducingStateTestContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlReducingStateTestContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlReducingStateTestContext.java
new file mode 100644
index 0000000..ea5e61b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlReducingStateTestContext.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import java.util.List;
+
+/** Test suite for {@link TtlReducingState}. */
+class TtlReducingStateTestContext
+	extends TtlMergingStateTestContext.TtlIntegerMergingStateTestContext<TtlReducingState<?, String, Integer>, Integer, Integer> {
+	@Override
+	void initTestValues() {
+		updateEmpty = 5;
+		updateUnexpired = 7;
+		updateExpired = 6;
+
+		getUpdateEmpty = 5;
+		getUnexpired = 12;
+		getUpdateExpired = 6;
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	<US extends State, SV> StateDescriptor<US, SV> createStateDescriptor() {
+		return (StateDescriptor<US, SV>) new ReducingStateDescriptor<>(
+			"TtlTestReducingState", REDUCE, IntSerializer.INSTANCE);
+	}
+
+	@Override
+	void update(Integer value) throws Exception {
+		ttlState.add(value);
+	}
+
+	@Override
+	Integer get() throws Exception {
+		return ttlState.get();
+	}
+
+	@Override
+	Object getOriginal() throws Exception {
+		return ttlState.original.get();
+	}
+
+	@Override
+	Integer getMergeResult(
+		List<Tuple2<String, Integer>> unexpiredUpdatesToMerge,
+		List<Tuple2<String, Integer>> finalUpdatesToMerge) {
+		return getIntegerMergeResult(unexpiredUpdatesToMerge, finalUpdatesToMerge);
+	}
+
+	private static final ReduceFunction<Integer> REDUCE = (v1, v2) -> {
+		if (v1 == null && v2 == null) {
+			return null;
+		} else if (v1 == null) {
+			return v2;
+		} else if (v2 == null) {
+			return v1;
+		} else {
+			return v1 + v2;
+		}
+	};
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
index bc3d6e7..5820f13 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
@@ -22,105 +22,153 @@ import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.StateTtlConfiguration;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.runtime.state.KeyedStateFactory;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.internal.InternalKvState;
-import org.apache.flink.runtime.state.ttl.mock.MockKeyedStateFactory;
-import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.flink.util.function.SupplierWithException;
-import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.StateMigrationException;
 
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.Consumer;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.not;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assume.assumeThat;
 
-abstract class TtlStateTestBase<S extends InternalKvState<?, String, ?>, UV, GV> {
+/** State TTL base test suite. */
+@RunWith(Parameterized.class)
+public abstract class TtlStateTestBase {
 	private static final long TTL = 100;
-	private static final KeyedStateFactory MOCK_ORIGINAL_STATE_FACTORY = new MockKeyedStateFactory();
 
-	S ttlState;
-	MockTimeProvider timeProvider;
-	StateTtlConfiguration ttlConfig;
+	private MockTtlTimeProvider timeProvider;
+	private StateBackendTestContext sbetc;
+	private StateTtlConfiguration ttlConfig;
 
-	ThrowingConsumer<UV, Exception> updater;
-	SupplierWithException<GV, Exception> getter;
-	SupplierWithException<?, Exception> originalGetter;
+	@Before
+	public void setup() {
+		timeProvider = new MockTtlTimeProvider();
+		sbetc = createStateBackendTestContext(timeProvider);
+	}
 
-	UV updateEmpty;
-	UV updateUnexpired;
-	UV updateExpired;
+	protected abstract StateBackendTestContext createStateBackendTestContext(TtlTimeProvider timeProvider);
+
+	@Parameterized.Parameter
+	public TtlStateTestContextBase<?, ?, ?> ctx;
+
+	@Parameterized.Parameters(name = "{0}")
+	public static List<TtlStateTestContextBase<?, ?, ?>> testContexts() {
+		return Arrays.asList(
+			new TtlValueStateTestContext(),
+			new TtlListStateTestContext(),
+			new TtlMapStateAllEntriesTestContext(),
+			new TtlMapStatePerElementTestContext(),
+			new TtlAggregatingStateTestContext(),
+			new TtlReducingStateTestContext(),
+			new TtlFoldingStateTestContext());
+	}
 
-	GV getUpdateEmpty;
-	GV getUnexpired;
-	GV getUpdateExpired;
+	@SuppressWarnings("unchecked")
+	private <S extends InternalKvState<?, String, ?>, UV> TtlStateTestContextBase<S, UV, ?> ctx() {
+		return (TtlStateTestContextBase<S, UV, ?>) ctx;
+	}
 
-	GV emptyValue = null;
+	@SuppressWarnings("unchecked")
+	private <UV> TtlMergingStateTestContext<?, UV, ?> mctx() {
+		return (TtlMergingStateTestContext<?, UV, ?>) ctx;
+	}
 
-	void initTest() {
+	private void initTest() throws Exception {
 		initTest(StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite, StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired);
 	}
 
-	private void initTest(StateTtlConfiguration.TtlUpdateType updateType, StateTtlConfiguration.TtlStateVisibility visibility) {
+	private void initTest(
+				StateTtlConfiguration.TtlUpdateType updateType,
+				StateTtlConfiguration.TtlStateVisibility visibility) throws Exception {
 		initTest(updateType, visibility, TTL);
 	}
 
-	private void initTest(StateTtlConfiguration.TtlUpdateType updateType, StateTtlConfiguration.TtlStateVisibility visibility, long ttl) {
-		timeProvider = new MockTimeProvider();
-		StateTtlConfiguration.Builder ttlConfigBuilder = StateTtlConfiguration.newBuilder(Time.seconds(5));
-		ttlConfigBuilder.setTtlUpdateType(updateType)
-						.setStateVisibility(visibility)
-						.setTimeCharacteristic(StateTtlConfiguration.TtlTimeCharacteristic.ProcessingTime)
-						.setTtl(Time.milliseconds(ttl));
-		ttlConfig = ttlConfigBuilder.build();
-		ttlState = createState();
-		initTestValues();
+	private void initTest(
+		StateTtlConfiguration.TtlUpdateType updateType,
+		StateTtlConfiguration.TtlStateVisibility visibility,
+		long ttl) throws Exception {
+		ttlConfig = StateTtlConfiguration
+			.newBuilder(Time.milliseconds(ttl))
+			.setTtlUpdateType(updateType)
+			.setStateVisibility(visibility)
+			.build();
+		sbetc.createAndRestoreKeyedStateBackend();
+		sbetc.restoreSnapshot(null);
+		createState();
+		ctx().initTestValues();
 	}
 
-	abstract S createState();
-
-	<SV, US extends State, IS extends US> IS wrapMockState(StateDescriptor<IS, SV> stateDesc) {
-		try {
-			return TtlStateFactory.createStateAndWrapWithTtlIfEnabled(
-				StringSerializer.INSTANCE, stateDesc,
-				MOCK_ORIGINAL_STATE_FACTORY, ttlConfig, timeProvider);
-		} catch (Exception e) {
-			throw new FlinkRuntimeException("Unexpected exception wrapping mock state", e);
-		}
+	@SuppressWarnings("unchecked")
+	private <S extends State> void createState() throws Exception {
+		StateDescriptor<S, Object> stateDescriptor = ctx().createStateDescriptor();
+		stateDescriptor.enableTimeToLive(ttlConfig);
+		ctx().ttlState =
+			(InternalKvState<?, String, ?>) sbetc.createState(stateDescriptor, "defaultNamespace");
 	}
 
-	abstract void initTestValues();
+	private void takeAndRestoreSnapshot() throws Exception {
+		KeyedStateHandle snapshot = sbetc.takeSnapshot();
+		sbetc.createAndRestoreKeyedStateBackend();
+		sbetc.restoreSnapshot(snapshot);
+		createState();
+	}
 
 	@Test
 	public void testNonExistentValue() throws Exception {
 		initTest();
-		assertEquals("Non-existing state should be empty", emptyValue, getter.get());
+		assertEquals("Non-existing state should be empty", ctx().emptyValue, ctx().get());
 	}
 
 	@Test
 	public void testExactExpirationOnWrite() throws Exception {
 		initTest(StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite, StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired);
 
+		takeAndRestoreSnapshot();
+
 		timeProvider.time = 0;
-		updater.accept(updateEmpty);
+		ctx().update(ctx().updateEmpty);
+
+		takeAndRestoreSnapshot();
 
 		timeProvider.time = 20;
-		assertEquals("Unexpired state should be available", getUpdateEmpty, getter.get());
+		assertEquals("Unexpired state should be available", ctx().getUpdateEmpty, ctx().get());
+
+		takeAndRestoreSnapshot();
 
 		timeProvider.time = 50;
-		updater.accept(updateUnexpired);
+		ctx().update(ctx().updateUnexpired);
+
+		takeAndRestoreSnapshot();
 
 		timeProvider.time = 120;
-		assertEquals("Unexpired state should be available after update", getUnexpired, getter.get());
+		assertEquals("Unexpired state should be available after update", ctx().getUnexpired, ctx().get());
+
+		takeAndRestoreSnapshot();
 
 		timeProvider.time = 170;
-		updater.accept(updateExpired);
+		ctx().update(ctx().updateExpired);
+
+		takeAndRestoreSnapshot();
 
 		timeProvider.time = 220;
-		assertEquals("Unexpired state should be available after update", getUpdateExpired, getter.get());
+		assertEquals("Unexpired state should be available after update", ctx().getUpdateExpired, ctx().get());
+
+		takeAndRestoreSnapshot();
 
 		timeProvider.time = 300;
-		assertEquals("Expired state should be unavailable", emptyValue, getter.get());
-		assertEquals("Original state should be cleared on access", emptyValue, originalGetter.get());
+		assertEquals("Expired state should be unavailable", ctx().emptyValue, ctx().get());
+		assertEquals("Original state should be cleared on access", ctx().emptyValue, ctx().getOriginal());
 	}
 
 	@Test
@@ -128,11 +176,14 @@ abstract class TtlStateTestBase<S extends InternalKvState<?, String, ?>, UV, GV>
 		initTest(StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite, StateTtlConfiguration.TtlStateVisibility.ReturnExpiredIfNotCleanedUp);
 
 		timeProvider.time = 0;
-		updater.accept(updateEmpty);
+		ctx().update(ctx().updateEmpty);
+
+		takeAndRestoreSnapshot();
 
 		timeProvider.time = 120;
-		assertEquals("Expired state should be available", getUpdateEmpty, getter.get());
-		assertEquals("Expired state should be cleared on access", emptyValue, getter.get());
+		assertEquals("Expired state should be available", ctx().getUpdateEmpty, ctx().get());
+		assertEquals("Original state should be cleared on access", ctx().emptyValue, ctx().getOriginal());
+		assertEquals("Expired state should be cleared on access", ctx().emptyValue, ctx().get());
 	}
 
 	@Test
@@ -140,17 +191,23 @@ abstract class TtlStateTestBase<S extends InternalKvState<?, String, ?>, UV, GV>
 		initTest(StateTtlConfiguration.TtlUpdateType.OnReadAndWrite, StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired);
 
 		timeProvider.time = 0;
-		updater.accept(updateEmpty);
+		ctx().update(ctx().updateEmpty);
+
+		takeAndRestoreSnapshot();
 
 		timeProvider.time = 50;
-		assertEquals("Unexpired state should be available", getUpdateEmpty, getter.get());
+		assertEquals("Unexpired state should be available", ctx().getUpdateEmpty, ctx().get());
+
+		takeAndRestoreSnapshot();
 
 		timeProvider.time = 120;
-		assertEquals("Unexpired state should be available after read", getUpdateEmpty, getter.get());
+		assertEquals("Unexpired state should be available after read", ctx().getUpdateEmpty, ctx().get());
+
+		takeAndRestoreSnapshot();
 
 		timeProvider.time = 250;
-		assertEquals("Expired state should be unavailable", emptyValue, getter.get());
-		assertEquals("Original state should be cleared on access", emptyValue, originalGetter.get());
+		assertEquals("Expired state should be unavailable", ctx().emptyValue, ctx().get());
+		assertEquals("Original state should be cleared on access", ctx().emptyValue, ctx().getOriginal());
 	}
 
 	@Test
@@ -158,14 +215,18 @@ abstract class TtlStateTestBase<S extends InternalKvState<?, String, ?>, UV, GV>
 		initTest(StateTtlConfiguration.TtlUpdateType.OnReadAndWrite, StateTtlConfiguration.TtlStateVisibility.ReturnExpiredIfNotCleanedUp);
 
 		timeProvider.time = 0;
-		updater.accept(updateEmpty);
+		ctx().update(ctx().updateEmpty);
+
+		takeAndRestoreSnapshot();
 
 		timeProvider.time = 50;
-		assertEquals("Unexpired state should be available", getUpdateEmpty, getter.get());
+		assertEquals("Unexpired state should be available", ctx().getUpdateEmpty, ctx().get());
+
+		takeAndRestoreSnapshot();
 
 		timeProvider.time = 170;
-		assertEquals("Expired state should be available", getUpdateEmpty, getter.get());
-		assertEquals("Expired state should be cleared on access", emptyValue, getter.get());
+		assertEquals("Expired state should be available", ctx().getUpdateEmpty, ctx().get());
+		assertEquals("Expired state should be cleared on access", ctx().emptyValue, ctx().get());
 	}
 
 	@Test
@@ -173,9 +234,154 @@ abstract class TtlStateTestBase<S extends InternalKvState<?, String, ?>, UV, GV>
 		initTest(StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite, StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired, Long.MAX_VALUE);
 
 		timeProvider.time = 10;
-		updater.accept(updateEmpty);
+		ctx().update(ctx().updateEmpty);
+
+		takeAndRestoreSnapshot();
+
+		timeProvider.time = 50;
+		assertEquals("Unexpired state should be available", ctx().getUpdateEmpty, ctx().get());
+	}
+
+	@Test
+	public void testMergeNamespaces() throws Exception {
+		assumeThat(ctx, instanceOf(TtlMergingStateTestContext.class));
+
+		initTest();
+
+		timeProvider.time = 0;
+		List<Tuple2<String, Object>> expiredUpdatesToMerge = mctx().generateExpiredUpdatesToMerge();
+		mctx().applyStateUpdates(expiredUpdatesToMerge);
+
+		takeAndRestoreSnapshot();
+
+		timeProvider.time = 120;
+		List<Tuple2<String, Object>> unexpiredUpdatesToMerge = mctx().generateUnexpiredUpdatesToMerge();
+		mctx().applyStateUpdates(unexpiredUpdatesToMerge);
+
+		takeAndRestoreSnapshot();
+
+		timeProvider.time = 150;
+		List<Tuple2<String, Object>> finalUpdatesToMerge = mctx().generateFinalUpdatesToMerge();
+		mctx().applyStateUpdates(finalUpdatesToMerge);
+
+		takeAndRestoreSnapshot();
+
+		timeProvider.time = 230;
+		mctx().ttlState.mergeNamespaces("targetNamespace", TtlMergingStateTestContext.NAMESPACES);
+		mctx().ttlState.setCurrentNamespace("targetNamespace");
+		assertEquals("Unexpected result of merge operation",
+			mctx().getMergeResult(unexpiredUpdatesToMerge, finalUpdatesToMerge), mctx().get());
+	}
+
+	@Test
+	public void testMultipleKeys() throws Exception {
+		testMultipleStateIds(id -> sbetc.setCurrentKey(id));
+	}
+
+	@Test
+	public void testMultipleNamespaces() throws Exception {
+		testMultipleStateIds(id -> ctx().ttlState.setCurrentNamespace(id));
+	}
+
+	private void testMultipleStateIds(Consumer<String> idChanger) throws Exception {
+		initTest();
+
+		timeProvider.time = 0;
+		idChanger.accept("id2");
+		ctx().update(ctx().updateEmpty);
+
+		takeAndRestoreSnapshot();
 
 		timeProvider.time = 50;
-		assertEquals("Unexpired state should be available", getUpdateEmpty, getter.get());
+		idChanger.accept("id1");
+		ctx().update(ctx().updateEmpty);
+		idChanger.accept("id2");
+		ctx().update(ctx().updateUnexpired);
+
+		takeAndRestoreSnapshot();
+
+		timeProvider.time = 120;
+		idChanger.accept("id1");
+		assertEquals("Unexpired state should be available", ctx().getUpdateEmpty, ctx().get());
+		idChanger.accept("id2");
+		assertEquals("Unexpired state should be available after update", ctx().getUnexpired, ctx().get());
+
+		takeAndRestoreSnapshot();
+
+		timeProvider.time = 170;
+		idChanger.accept("id2");
+		ctx().update(ctx().updateExpired);
+
+		takeAndRestoreSnapshot();
+
+		timeProvider.time = 230;
+		idChanger.accept("id1");
+		assertEquals("Expired state should be unavailable", ctx().emptyValue, ctx().get());
+		idChanger.accept("id2");
+		assertEquals("Unexpired state should be available after update", ctx().getUpdateExpired, ctx().get());
+
+		takeAndRestoreSnapshot();
+
+		timeProvider.time = 300;
+		idChanger.accept("id1");
+		assertEquals("Expired state should be unavailable", ctx().emptyValue, ctx().get());
+		idChanger.accept("id2");
+		assertEquals("Expired state should be unavailable", ctx().emptyValue, ctx().get());
+	}
+
+	@Test
+	public void testSnapshotChangeRestore() throws Exception {
+		initTest();
+
+		timeProvider.time = 0;
+		sbetc.setCurrentKey("k1");
+		ctx().update(ctx().updateEmpty);
+
+		timeProvider.time = 50;
+		sbetc.setCurrentKey("k1");
+		ctx().update(ctx().updateUnexpired);
+
+		timeProvider.time = 100;
+		sbetc.setCurrentKey("k2");
+		ctx().update(ctx().updateEmpty);
+
+		KeyedStateHandle snapshot = sbetc.takeSnapshot();
+
+		timeProvider.time = 170;
+		sbetc.setCurrentKey("k1");
+		ctx().update(ctx().updateExpired);
+		sbetc.setCurrentKey("k2");
+		ctx().update(ctx().updateUnexpired);
+
+		sbetc.createAndRestoreKeyedStateBackend();
+		sbetc.restoreSnapshot(snapshot);
+		createState();
+
+		timeProvider.time = 180;
+		sbetc.setCurrentKey("k1");
+		assertEquals("Expired state should be unavailable", ctx().emptyValue, ctx().get());
+		sbetc.setCurrentKey("k2");
+		assertEquals("Unexpired state should be available", ctx().getUpdateEmpty, ctx().get());
+	}
+
+	@Test(expected = StateMigrationException.class)
+	public void testRestoreTtlAndRegisterNonTtlStateCompatFailure() throws Exception {
+		assumeThat(this, not(instanceOf(MockTtlStateTest.class)));
+
+		initTest();
+
+		timeProvider.time = 0;
+		ctx().update(ctx().updateEmpty);
+
+		KeyedStateHandle snapshot = sbetc.takeSnapshot();
+		sbetc.createAndRestoreKeyedStateBackend();
+
+		sbetc.restoreSnapshot(snapshot);
+		sbetc.createState(ctx().createStateDescriptor(), "");
+	}
+
+	@After
+	public void tearDown() {
+		sbetc.disposeKeyedStateBackend();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestContextBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestContextBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestContextBase.java
new file mode 100644
index 0000000..d40bfb0
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestContextBase.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+
+abstract class TtlStateTestContextBase<S extends InternalKvState<?, String, ?>, UV, GV> {
+	S ttlState;
+
+	UV updateEmpty;
+	UV updateUnexpired;
+	UV updateExpired;
+
+	GV getUpdateEmpty;
+	GV getUnexpired;
+	GV getUpdateExpired;
+
+	GV emptyValue = null;
+
+	abstract void initTestValues();
+
+	abstract <US extends State, SV> StateDescriptor<US, SV> createStateDescriptor();
+
+	abstract void update(UV value) throws Exception;
+
+	abstract GV get() throws Exception;
+
+	abstract Object getOriginal() throws Exception;
+
+	@Override
+	public String toString() {
+		return this.getClass().getSimpleName();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlValueStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlValueStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlValueStateTest.java
deleted file mode 100644
index 8d9a4b4..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlValueStateTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.ttl;
-
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-
-/** Test suite for {@link TtlValueState}. */
-public class TtlValueStateTest extends TtlStateTestBase<TtlValueState<?, String, String>, String, String> {
-	private static final String TEST_VAL1 = "test value1";
-	private static final String TEST_VAL2 = "test value2";
-	private static final String TEST_VAL3 = "test value3";
-
-	@Override
-	void initTestValues() {
-		updater = v -> ttlState.update(v);
-		getter = () -> ttlState.value();
-		originalGetter = () -> ttlState.original.value();
-
-		updateEmpty = TEST_VAL1;
-		updateUnexpired = TEST_VAL2;
-		updateExpired = TEST_VAL3;
-
-		getUpdateEmpty = TEST_VAL1;
-		getUnexpired = TEST_VAL2;
-		getUpdateExpired = TEST_VAL3;
-	}
-
-	@Override
-	TtlValueState<?, String, String> createState() {
-		ValueStateDescriptor<String> valueStateDesc =
-			new ValueStateDescriptor<>("TtlValueTestState", StringSerializer.INSTANCE);
-		return (TtlValueState<?, String, String>) wrapMockState(valueStateDesc);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlValueStateTestContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlValueStateTestContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlValueStateTestContext.java
new file mode 100644
index 0000000..976d891
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlValueStateTestContext.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+
+/** Test suite for {@link TtlValueState}. */
+class TtlValueStateTestContext extends TtlStateTestContextBase<TtlValueState<?, String, String>, String, String> {
+	private static final String TEST_VAL1 = "test value1";
+	private static final String TEST_VAL2 = "test value2";
+	private static final String TEST_VAL3 = "test value3";
+
+	@Override
+	void initTestValues() {
+		updateEmpty = TEST_VAL1;
+		updateUnexpired = TEST_VAL2;
+		updateExpired = TEST_VAL3;
+
+		getUpdateEmpty = TEST_VAL1;
+		getUnexpired = TEST_VAL2;
+		getUpdateExpired = TEST_VAL3;
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	<US extends State, SV> StateDescriptor<US, SV> createStateDescriptor() {
+		return (StateDescriptor<US, SV>) new ValueStateDescriptor<>(
+			"TtlValueTestState", StringSerializer.INSTANCE);
+	}
+
+	@Override
+	void update(String value) throws Exception {
+		ttlState.update(value);
+	}
+
+	@Override
+	String get() throws Exception {
+		return ttlState.value();
+	}
+
+	@Override
+	Object getOriginal() throws Exception {
+		return ttlState.original.value();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalKvState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalKvState.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalKvState.java
index 439ca7f2..3f94669 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalKvState.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalKvState.java
@@ -21,14 +21,12 @@ package org.apache.flink.runtime.state.ttl.mock;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 
-import java.util.HashMap;
 import java.util.Map;
 import java.util.function.Supplier;
 
 /** In memory mock internal state base class. */
-class MockInternalKvState<K, N, T> implements InternalKvState<K, N, T> {
-	private Map<N, T> namespacedValues = new HashMap<>();
-	private T defaultNamespaceValue;
+abstract class MockInternalKvState<K, N, T> implements InternalKvState<K, N, T> {
+	Supplier<Map<Object, Object>> values;
 	private N currentNamespace;
 	private final Supplier<T> emptyValue;
 
@@ -38,7 +36,6 @@ class MockInternalKvState<K, N, T> implements InternalKvState<K, N, T> {
 
 	MockInternalKvState(Supplier<T> emptyValue) {
 		this.emptyValue = emptyValue;
-		defaultNamespaceValue = emptyValue.get();
 	}
 
 	@Override
@@ -72,26 +69,20 @@ class MockInternalKvState<K, N, T> implements InternalKvState<K, N, T> {
 
 	@Override
 	public void clear() {
-		if (currentNamespace == null) {
-			defaultNamespaceValue = emptyValue.get();
-		} else {
-			namespacedValues.remove(currentNamespace);
-		}
+		getCurrentKeyValues().remove(currentNamespace);
 	}
 
+	@SuppressWarnings("unchecked")
 	public T getInternal() {
-		T value = currentNamespace == null ? defaultNamespaceValue :
-			namespacedValues.getOrDefault(currentNamespace, emptyValue.get());
-		updateInternal(value);
-		return value;
+		return (T) getCurrentKeyValues().computeIfAbsent(currentNamespace, n -> emptyValue.get());
 	}
 
 	@SuppressWarnings("WeakerAccess")
 	public void updateInternal(T valueToStore) {
-		if (currentNamespace == null) {
-			defaultNamespaceValue = valueToStore;
-		} else {
-			namespacedValues.put(currentNamespace, valueToStore);
-		}
+		getCurrentKeyValues().put(currentNamespace, valueToStore);
+	}
+
+	private Map<Object, Object> getCurrentKeyValues() {
+		return values.get();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalMapState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalMapState.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalMapState.java
index 386ef97..9b5ac10 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalMapState.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalMapState.java
@@ -68,21 +68,17 @@ public class MockInternalMapState<K, N, UK, UV>
 
 	@Override
 	public Iterable<Map.Entry<UK, UV>> entries() {
-		return copy().entrySet();
-	}
-
-	private Map<UK, UV> copy() {
-		return new HashMap<>(getInternal());
+		return getInternal().entrySet();
 	}
 
 	@Override
 	public Iterable<UK> keys() {
-		return copy().keySet();
+		return getInternal().keySet();
 	}
 
 	@Override
 	public Iterable<UV> values() {
-		return copy().values();
+		return getInternal().values();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
new file mode 100644
index 0000000..363ecf8
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl.mock;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyExtractorFunction;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.KeyedStateFactory;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.PriorityComparator;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueSet;
+import org.apache.flink.runtime.state.ttl.TtlStateFactory;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.RunnableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** State backend which produces in memory mock state objects. */
+public class MockKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
+
+	@SuppressWarnings("deprecation")
+	private static final Map<Class<? extends StateDescriptor>, KeyedStateFactory> STATE_FACTORIES =
+		Stream.of(
+			Tuple2.of(ValueStateDescriptor.class, (KeyedStateFactory) MockInternalValueState::createState),
+			Tuple2.of(ListStateDescriptor.class, (KeyedStateFactory) MockInternalListState::createState),
+			Tuple2.of(MapStateDescriptor.class, (KeyedStateFactory) MockInternalMapState::createState),
+			Tuple2.of(ReducingStateDescriptor.class, (KeyedStateFactory) MockInternalReducingState::createState),
+			Tuple2.of(AggregatingStateDescriptor.class, (KeyedStateFactory) MockInternalAggregatingState::createState),
+			Tuple2.of(FoldingStateDescriptor.class, (KeyedStateFactory) MockInternalFoldingState::createState)
+		).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+
+	private final Map<String, Map<K, Map<Object, Object>>> stateValues = new HashMap<>();
+
+	MockKeyedStateBackend(
+		TaskKvStateRegistry kvStateRegistry,
+		TypeSerializer<K> keySerializer,
+		ClassLoader userCodeClassLoader,
+		int numberOfKeyGroups,
+		KeyGroupRange keyGroupRange,
+		ExecutionConfig executionConfig,
+		TtlTimeProvider ttlTimeProvider) {
+		super(kvStateRegistry, keySerializer, userCodeClassLoader,
+			numberOfKeyGroups, keyGroupRange, executionConfig, ttlTimeProvider);
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	public <N, SV, S extends State, IS extends S> IS createInternalState(
+		TypeSerializer<N> namespaceSerializer,
+		StateDescriptor<S, SV> stateDesc) throws Exception {
+		KeyedStateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass());
+		if (stateFactory == null) {
+			String message = String.format("State %s is not supported by %s",
+				stateDesc.getClass(), TtlStateFactory.class);
+			throw new FlinkRuntimeException(message);
+		}
+		IS state = stateFactory.createInternalState(namespaceSerializer, stateDesc);
+		((MockInternalKvState<K, N, SV>) state).values = () -> stateValues
+			.computeIfAbsent(stateDesc.getName(), n -> new HashMap<>())
+			.computeIfAbsent(getCurrentKey(), k -> new HashMap<>());
+		return state;
+	}
+
+	@Override
+	public int numStateEntries() {
+		int count = 0;
+		for (String state : stateValues.keySet()) {
+			for (K key : stateValues.get(state).keySet()) {
+				count += stateValues.get(state).get(key).size();
+			}
+		}
+		return count;
+	}
+
+	@Override
+	public void notifyCheckpointComplete(long checkpointId) {
+		// noop
+	}
+
+	@Override
+	public <N> Stream<K> getKeys(String state, N namespace) {
+		return stateValues.get(state).entrySet().stream()
+			.filter(e -> e.getValue().containsKey(namespace))
+			.map(Map.Entry::getKey);
+	}
+
+	@Override
+	public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(
+		long checkpointId,
+		long timestamp,
+		CheckpointStreamFactory streamFactory,
+		CheckpointOptions checkpointOptions) {
+		return new FutureTask<>(() -> SnapshotResult.of(new MockKeyedStateHandle<>(copy(stateValues))));
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public void restore(Collection<KeyedStateHandle> state) {
+		stateValues.clear();
+		state = state == null ? Collections.emptyList() : state;
+		state.forEach(ksh -> stateValues.putAll(copy(((MockKeyedStateHandle<K>) ksh).snapshotStates)));
+	}
+
+	@SuppressWarnings("unchecked")
+	private static <K> Map<String, Map<K, Map<Object, Object>>> copy(
+		Map<String, Map<K, Map<Object, Object>>> stateValues) {
+		Map<String, Map<K, Map<Object, Object>>> snapshotStates = new HashMap<>();
+		for (String stateName : stateValues.keySet()) {
+			Map<K, Map<Object, Object>> keyedValues = snapshotStates.computeIfAbsent(stateName, s -> new HashMap<>());
+			for (K key : stateValues.get(stateName).keySet()) {
+				Map<Object, Object> values = keyedValues.computeIfAbsent(key, s -> new HashMap<>());
+				for (Object namespace : stateValues.get(stateName).get(key).keySet()) {
+					Object value = stateValues.get(stateName).get(key).get(namespace);
+					value = value instanceof List ? new ArrayList<>((List) value) : value;
+					value = value instanceof Map ? new HashMap<>((Map) value) : value;
+					values.put(namespace, value);
+				}
+			}
+		}
+		return snapshotStates;
+	}
+
+	@Nonnull
+	@Override
+	public <T extends HeapPriorityQueueElement> KeyGroupedInternalPriorityQueue<T>
+	create(
+		@Nonnull String stateName,
+		@Nonnull TypeSerializer<T> byteOrderedElementSerializer,
+		@Nonnull PriorityComparator<T> elementPriorityComparator,
+		@Nonnull KeyExtractorFunction<T> keyExtractor) {
+		return new HeapPriorityQueueSet<>(
+			elementPriorityComparator,
+			keyExtractor,
+			0,
+			keyGroupRange,
+			0);
+	}
+
+	private static class MockKeyedStateHandle<K> implements KeyedStateHandle {
+		private static final long serialVersionUID = 1L;
+
+		final Map<String, Map<K, Map<Object, Object>>> snapshotStates;
+
+		MockKeyedStateHandle(Map<String, Map<K, Map<Object, Object>>> snapshotStates) {
+			this.snapshotStates = snapshotStates;
+		}
+
+		@Override
+		public void discardState() {
+			snapshotStates.clear();
+		}
+
+		@Override
+		public long getStateSize() {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public void registerSharedStates(SharedStateRegistry stateRegistry) {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public KeyGroupRange getKeyGroupRange() {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
+			throw new UnsupportedOperationException();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateFactory.java
deleted file mode 100644
index d843352..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateFactory.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.ttl.mock;
-
-import org.apache.flink.api.common.state.AggregatingStateDescriptor;
-import org.apache.flink.api.common.state.FoldingStateDescriptor;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.state.ReducingStateDescriptor;
-import org.apache.flink.api.common.state.State;
-import org.apache.flink.api.common.state.StateDescriptor;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.state.KeyedStateFactory;
-import org.apache.flink.runtime.state.ttl.TtlStateFactory;
-import org.apache.flink.util.FlinkRuntimeException;
-
-import java.util.Map;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-/** State factory which produces in memory mock state objects. */
-public class MockKeyedStateFactory implements KeyedStateFactory {
-	@SuppressWarnings("deprecation")
-	private static final Map<Class<? extends StateDescriptor>, KeyedStateFactory> STATE_FACTORIES =
-		Stream.of(
-			Tuple2.of(ValueStateDescriptor.class, (KeyedStateFactory) MockInternalValueState::createState),
-			Tuple2.of(ListStateDescriptor.class, (KeyedStateFactory) MockInternalListState::createState),
-			Tuple2.of(MapStateDescriptor.class, (KeyedStateFactory) MockInternalMapState::createState),
-			Tuple2.of(ReducingStateDescriptor.class, (KeyedStateFactory) MockInternalReducingState::createState),
-			Tuple2.of(AggregatingStateDescriptor.class, (KeyedStateFactory) MockInternalAggregatingState::createState),
-			Tuple2.of(FoldingStateDescriptor.class, (KeyedStateFactory) MockInternalFoldingState::createState)
-		).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
-
-	@Override
-	public <N, SV, S extends State, IS extends S> IS createState(
-		TypeSerializer<N> namespaceSerializer,
-		StateDescriptor<S, SV> stateDesc) throws Exception {
-		KeyedStateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass());
-		if (stateFactory == null) {
-			String message = String.format("State %s is not supported by %s",
-				stateDesc.getClass(), TtlStateFactory.class);
-			throw new FlinkRuntimeException(message);
-		}
-		return stateFactory.createState(namespaceSerializer, stateDesc);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockStateBackend.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockStateBackend.java
new file mode 100644
index 0000000..a8d49dd
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockStateBackend.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl.mock;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.CheckpointStorageLocation;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+
+import javax.annotation.Nullable;
+
+/** mack state backend. */
+public class MockStateBackend extends AbstractStateBackend {
+	@Override
+	public CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public CheckpointStorage createCheckpointStorage(JobID jobId) {
+		return new CheckpointStorage() {
+			@Override
+			public boolean supportsHighlyAvailableStorage() {
+				return false;
+			}
+
+			@Override
+			public boolean hasDefaultSavepointLocation() {
+				return false;
+			}
+
+			@Override
+			public CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer) {
+				return null;
+			}
+
+			@Override
+			public CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId) {
+				return null;
+			}
+
+			@Override
+			public CheckpointStorageLocation initializeLocationForSavepoint(long checkpointId, @Nullable String externalLocationPointer) {
+				return null;
+			}
+
+			@Override
+			public CheckpointStreamFactory resolveCheckpointStorageLocation(long checkpointId, CheckpointStorageLocationReference reference) {
+				return null;
+			}
+
+			@Override
+			public CheckpointStreamFactory.CheckpointStateOutputStream createTaskOwnedStateStream() {
+				return null;
+			}
+		};
+	}
+
+	@Override
+	public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
+		Environment env,
+		JobID jobID,
+		String operatorIdentifier,
+		TypeSerializer<K> keySerializer,
+		int numberOfKeyGroups,
+		KeyGroupRange keyGroupRange,
+		TaskKvStateRegistry kvStateRegistry,
+		TtlTimeProvider ttlTimeProvider) {
+		return new MockKeyedStateBackend<>(
+			new KvStateRegistry().createTaskRegistry(jobID, new JobVertexID()),
+			keySerializer,
+			env.getUserClassLoader(),
+			numberOfKeyGroups,
+			keyGroupRange,
+			env.getExecutionConfig(),
+			ttlTimeProvider);
+	}
+
+	@Override
+	public OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier) {
+		throw new UnsupportedOperationException();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 6b4e3e9..3bf0aea 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -87,6 +87,7 @@ import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
 import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
 import org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue;
 import org.apache.flink.runtime.state.heap.TreeOrderedSetCache;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FileUtils;
@@ -270,10 +271,12 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		ExecutionConfig executionConfig,
 		boolean enableIncrementalCheckpointing,
 		LocalRecoveryConfig localRecoveryConfig,
-		RocksDBStateBackend.PriorityQueueStateType priorityQueueStateType
+		RocksDBStateBackend.PriorityQueueStateType priorityQueueStateType,
+		TtlTimeProvider ttlTimeProvider
 	) throws IOException {
 
-		super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange, executionConfig);
+		super(kvStateRegistry, keySerializer, userCodeClassLoader,
+			numberOfKeyGroups, keyGroupRange, executionConfig, ttlTimeProvider);
 
 		this.operatorIdentifier = Preconditions.checkNotNull(operatorIdentifier);
 
@@ -1347,7 +1350,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	}
 
 	@Override
-	public <N, SV, S extends State, IS extends S> IS createState(
+	public <N, SV, S extends State, IS extends S> IS createInternalState(
 		TypeSerializer<N> namespaceSerializer,
 		StateDescriptor<S, SV> stateDesc) throws Exception {
 		StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass());

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 998521b..1794e17 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.state.LocalRecoveryConfig;
 import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.util.AbstractID;
 import org.apache.flink.util.TernaryBoolean;
 
@@ -410,7 +411,8 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
 			TypeSerializer<K> keySerializer,
 			int numberOfKeyGroups,
 			KeyGroupRange keyGroupRange,
-			TaskKvStateRegistry kvStateRegistry) throws IOException {
+			TaskKvStateRegistry kvStateRegistry,
+			TtlTimeProvider ttlTimeProvider) throws IOException {
 
 		// first, make sure that the RocksDB JNI library is loaded
 		// we do this explicitly here to have better error handling
@@ -442,7 +444,8 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
 				env.getExecutionConfig(),
 				isIncrementalCheckpointsEnabled(),
 				localRecoveryConfig,
-				priorityQueueStateType);
+				priorityQueueStateType,
+				ttlTimeProvider);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index 69069d6..6b254ce 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
 import org.apache.flink.runtime.util.BlockingCheckpointOutputStream;
 
@@ -241,7 +242,8 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
 				new ExecutionConfig(),
 				enableIncrementalCheckpointing,
 				TestLocalRecoveryConfig.disabled(),
-				RocksDBStateBackend.PriorityQueueStateType.HEAP);
+				RocksDBStateBackend.PriorityQueueStateType.HEAP,
+				TtlTimeProvider.DEFAULT);
 
 			verify(columnFamilyOptions, Mockito.times(1))
 				.setMergeOperatorName(RocksDBKeyedStateBackend.MERGE_OPERATOR_NAME);


[3/3] flink git commit: [FLINK-9701] Introduce TTL configuration in state descriptors

Posted by sr...@apache.org.
[FLINK-9701] Introduce TTL configuration in state descriptors

This closes #6313.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f45b7f7f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f45b7f7f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f45b7f7f

Branch: refs/heads/master
Commit: f45b7f7ff27df019e9045895e718fa112d12139c
Parents: b407ba7
Author: Andrey Zagrebin <az...@gmail.com>
Authored: Tue Jul 3 19:23:41 2018 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Thu Jul 12 20:18:23 2018 +0200

----------------------------------------------------------------------
 .../flink/api/common/state/StateDescriptor.java |  28 ++
 .../api/common/state/StateTtlConfiguration.java |   3 -
 .../common/typeutils/CompositeSerializer.java   |  56 ++-
 .../KVStateRequestSerializerRocksDBTest.java    |   9 +-
 .../network/KvStateRequestSerializerTest.java   |   9 +-
 .../network/KvStateServerHandlerTest.java       |   4 +-
 .../state/AbstractKeyedStateBackend.java        |  47 ++-
 .../runtime/state/AbstractStateBackend.java     |  16 +-
 .../flink/runtime/state/KeyedStateFactory.java  |   2 +-
 .../flink/runtime/state/StateBackend.java       |  39 ++-
 .../state/filesystem/FsStateBackend.java        |  21 +-
 .../state/heap/HeapKeyedStateBackend.java       |   9 +-
 .../state/memory/MemoryStateBackend.java        |   7 +-
 .../runtime/state/ttl/AbstractTtlDecorator.java |   2 -
 .../flink/runtime/state/ttl/TtlMapState.java    |  97 ++++--
 .../runtime/state/ttl/TtlReducingState.java     |   2 +-
 .../runtime/state/ttl/TtlStateFactory.java      |  22 +-
 .../runtime/state/ttl/TtlTimeProvider.java      |   4 +-
 .../CheckpointSettingsSerializableTest.java     |   4 +-
 ...HeapKeyedStateBackendAsyncByDefaultTest.java |   4 +-
 .../runtime/state/StateBackendTestBase.java     |  15 +-
 .../state/StateSnapshotCompressionTest.java     |  17 +-
 ...pKeyedStateBackendSnapshotMigrationTest.java |   4 +-
 .../state/heap/HeapStateBackendTestBase.java    |   4 +-
 .../runtime/state/ttl/HeapTtlStateTest.java     |  35 ++
 .../runtime/state/ttl/MockTimeProvider.java     |  28 --
 .../runtime/state/ttl/MockTtlStateTest.java     |  35 ++
 .../runtime/state/ttl/MockTtlTimeProvider.java  |  28 ++
 .../state/ttl/StateBackendTestContext.java      | 125 +++++++
 .../state/ttl/TtlAggregatingStateTest.java      |  90 -----
 .../ttl/TtlAggregatingStateTestContext.java     | 103 ++++++
 .../runtime/state/ttl/TtlFoldingStateTest.java  |  54 ---
 .../state/ttl/TtlFoldingStateTestContext.java   |  67 ++++
 .../runtime/state/ttl/TtlListStateTest.java     |  75 ----
 .../state/ttl/TtlListStateTestContext.java      |  87 +++++
 .../ttl/TtlMapStateAllEntriesTestContext.java   |  66 ++++
 .../state/ttl/TtlMapStatePerElementTest.java    |  42 ---
 .../ttl/TtlMapStatePerElementTestContext.java   |  53 +++
 .../runtime/state/ttl/TtlMapStateTest.java      |  55 ---
 .../runtime/state/ttl/TtlMapStateTestBase.java  |  33 --
 .../state/ttl/TtlMapStateTestContext.java       |  35 ++
 .../runtime/state/ttl/TtlMergingStateBase.java  | 126 -------
 .../state/ttl/TtlMergingStateTestContext.java   |  99 ++++++
 .../runtime/state/ttl/TtlReducingStateTest.java |  71 ----
 .../state/ttl/TtlReducingStateTestContext.java  |  84 +++++
 .../runtime/state/ttl/TtlStateTestBase.java     | 340 +++++++++++++++----
 .../state/ttl/TtlStateTestContextBase.java      |  52 +++
 .../runtime/state/ttl/TtlValueStateTest.java    |  51 ---
 .../state/ttl/TtlValueStateTestContext.java     |  64 ++++
 .../state/ttl/mock/MockInternalKvState.java     |  29 +-
 .../state/ttl/mock/MockInternalMapState.java    |  10 +-
 .../state/ttl/mock/MockKeyedStateBackend.java   | 218 ++++++++++++
 .../state/ttl/mock/MockKeyedStateFactory.java   |  64 ----
 .../state/ttl/mock/MockStateBackend.java        | 111 ++++++
 .../state/RocksDBKeyedStateBackend.java         |   9 +-
 .../streaming/state/RocksDBStateBackend.java    |   7 +-
 .../state/RocksDBStateBackendTest.java          |   4 +-
 .../streaming/state/RocksDBTtlStateTest.java    |  62 ++++
 .../StreamTaskStateInitializerImpl.java         |   4 +-
 .../StreamTaskStateInitializerImplTest.java     |   4 +-
 .../tasks/StreamTaskTerminationTest.java        |   4 +-
 .../tasks/TestSpyWrapperStateBackend.java       |   7 +-
 .../streaming/runtime/StateBackendITCase.java   |  16 +-
 63 files changed, 1933 insertions(+), 939 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
index 6c54e71..956fd05 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.common.state;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -92,6 +93,10 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
 	@Nullable
 	private String queryableStateName;
 
+	/** Name for queries against state created from this StateDescriptor. */
+	@Nullable
+	private StateTtlConfiguration ttlConfig;
+
 	/** The default value returned by the state when no other value is bound to a key. */
 	@Nullable
 	protected transient T defaultValue;
@@ -203,6 +208,8 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
 	 * @throws IllegalStateException If queryable state name already set
 	 */
 	public void setQueryable(String queryableStateName) {
+		Preconditions.checkArgument(ttlConfig == null,
+			"Queryable state is currently not supported with TTL");
 		if (this.queryableStateName == null) {
 			this.queryableStateName = Preconditions.checkNotNull(queryableStateName, "Registration name");
 		} else {
@@ -230,6 +237,27 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
 		return queryableStateName != null;
 	}
 
+	/**
+	 * Configures optional activation of state time-to-live (TTL).
+	 *
+	 * <p>State user value will expire, become unavailable and be cleaned up in storage
+	 * depending on configured {@link StateTtlConfiguration}.
+	 *
+	 * @param ttlConfig configuration of state TTL
+	 */
+	public void enableTimeToLive(StateTtlConfiguration ttlConfig) {
+		Preconditions.checkNotNull(ttlConfig);
+		Preconditions.checkArgument(queryableStateName == null,
+			"Queryable state is currently not supported with TTL");
+		this.ttlConfig = ttlConfig;
+	}
+
+	@Nullable
+	@Internal
+	public StateTtlConfiguration getTtlConfig() {
+		return ttlConfig;
+	}
+
 	// ------------------------------------------------------------------------
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfiguration.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfiguration.java
index 8ef2046..9bd8b15 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfiguration.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfiguration.java
@@ -27,15 +27,12 @@ import static org.apache.flink.api.common.state.StateTtlConfiguration.TtlUpdateT
 
 /**
  * Configuration of state TTL logic.
- * TODO: builder
  */
 public class StateTtlConfiguration {
 	/**
 	 * This option value configures when to update last access timestamp which prolongs state TTL.
 	 */
 	public enum TtlUpdateType {
-		/** TTL is disabled. State does not expire. */
-		Disabled,
 		/** Last access timestamp is initialised when state is created and updated on every write operation. */
 		OnCreateAndWrite,
 		/** The same as <code>OnCreateAndWrite</code> but also updated on read. */

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
index 43c5533..2db7a30 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
@@ -50,7 +50,7 @@ public abstract class CompositeSerializer<T> extends TypeSerializer<T> {
 	@SuppressWarnings("unchecked")
 	protected CompositeSerializer(boolean immutableTargetType, TypeSerializer<?> ... fieldSerializers) {
 		this(
-			new PrecomputedParameters(immutableTargetType, (TypeSerializer<Object>[]) fieldSerializers),
+			PrecomputedParameters.precompute(immutableTargetType, (TypeSerializer<Object>[]) fieldSerializers),
 			fieldSerializers);
 	}
 
@@ -187,6 +187,7 @@ public abstract class CompositeSerializer<T> extends TypeSerializer<T> {
 		return 31 * Boolean.hashCode(precomputed.immutableTargetType) + Arrays.hashCode(fieldSerializers);
 	}
 
+	@SuppressWarnings("EqualsWhichDoesntCheckParameterClass")
 	@Override
 	public boolean equals(Object obj) {
 		if (canEqual(obj)) {
@@ -205,17 +206,12 @@ public abstract class CompositeSerializer<T> extends TypeSerializer<T> {
 
 	@Override
 	public TypeSerializerConfigSnapshot snapshotConfiguration() {
-		return new CompositeTypeSerializerConfigSnapshot(fieldSerializers) {
-			@Override
-			public int getVersion() {
-				return 0;
-			}
-		};
+		return new ConfigSnapshot(fieldSerializers);
 	}
 
 	@Override
 	public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
-		if (configSnapshot instanceof CompositeTypeSerializerConfigSnapshot) {
+		if (configSnapshot instanceof ConfigSnapshot) {
 			List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> previousSerializersAndConfigs =
 				((CompositeTypeSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs();
 			if (previousSerializersAndConfigs.size() == fieldSerializers.length) {
@@ -242,11 +238,7 @@ public abstract class CompositeSerializer<T> extends TypeSerializer<T> {
 				}
 			}
 		}
-		PrecomputedParameters precomputed =
-			new PrecomputedParameters(this.precomputed.immutableTargetType, convertSerializers);
-		return requiresMigration ?
-			CompatibilityResult.requiresMigration(createSerializerInstance(precomputed, convertSerializers)) :
-			CompatibilityResult.compatible();
+		return requiresMigration ? createMigrationCompatResult(convertSerializers) : CompatibilityResult.compatible();
 	}
 
 	private CompatibilityResult<Object> resolveFieldCompatibility(
@@ -256,6 +248,12 @@ public abstract class CompositeSerializer<T> extends TypeSerializer<T> {
 			previousSerializersAndConfigs.get(index).f1, fieldSerializers[index]);
 	}
 
+	private CompatibilityResult<T> createMigrationCompatResult(TypeSerializer<Object>[] convertSerializers) {
+		PrecomputedParameters precomputed =
+			PrecomputedParameters.precompute(this.precomputed.immutableTargetType, convertSerializers);
+		return CompatibilityResult.requiresMigration(createSerializerInstance(precomputed, convertSerializers));
+	}
+
 	/** This class holds composite serializer parameters which can be precomputed in advanced for better performance. */
 	protected static class PrecomputedParameters implements Serializable {
 		private static final long serialVersionUID = 1L;
@@ -272,7 +270,14 @@ public abstract class CompositeSerializer<T> extends TypeSerializer<T> {
 		/** Whether any field serializer is stateful. */
 		final boolean stateful;
 
-		PrecomputedParameters(
+		private PrecomputedParameters(boolean immutableTargetType, boolean immutable, int length, boolean stateful) {
+			this.immutableTargetType = immutableTargetType;
+			this.immutable = immutable;
+			this.length = length;
+			this.stateful = stateful;
+		}
+
+		static PrecomputedParameters precompute(
 			boolean immutableTargetType,
 			TypeSerializer<Object>[] fieldSerializers) {
 			Preconditions.checkNotNull(fieldSerializers);
@@ -292,11 +297,26 @@ public abstract class CompositeSerializer<T> extends TypeSerializer<T> {
 				}
 				totalLength = totalLength >= 0 ? totalLength + fieldSerializer.getLength() : totalLength;
 			}
+			return new PrecomputedParameters(immutableTargetType, fieldsImmutable, totalLength, stateful);
+		}
+	}
 
-			this.immutableTargetType = immutableTargetType;
-			this.immutable = immutableTargetType && fieldsImmutable;
-			this.length = totalLength;
-			this.stateful = stateful;
+	/** Snapshot field serializers of composite type. */
+	public static class ConfigSnapshot extends CompositeTypeSerializerConfigSnapshot {
+		private static final int VERSION = 0;
+
+		/** This empty nullary constructor is required for deserializing the configuration. */
+		@SuppressWarnings("unused")
+		public ConfigSnapshot() {
+		}
+
+		ConfigSnapshot(@Nonnull TypeSerializer<?>... nestedSerializers) {
+			super(nestedSerializers);
+		}
+
+		@Override
+		public int getVersion() {
+			return VERSION;
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
index 9ea3198..176e55a 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
 import org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 
 import org.junit.Rule;
 import org.junit.Test;
@@ -80,12 +81,13 @@ public final class KVStateRequestSerializerRocksDBTest {
 				new ExecutionConfig(),
 				false,
 				TestLocalRecoveryConfig.disabled(),
-				RocksDBStateBackend.PriorityQueueStateType.HEAP
+				RocksDBStateBackend.PriorityQueueStateType.HEAP,
+				TtlTimeProvider.DEFAULT
 			);
 		longHeapKeyedStateBackend.restore(null);
 		longHeapKeyedStateBackend.setCurrentKey(key);
 
-		final InternalListState<Long, VoidNamespace, Long> listState = longHeapKeyedStateBackend.createState(VoidNamespaceSerializer.INSTANCE,
+		final InternalListState<Long, VoidNamespace, Long> listState = longHeapKeyedStateBackend.createInternalState(VoidNamespaceSerializer.INSTANCE,
 				new ListStateDescriptor<>("test", LongSerializer.INSTANCE));
 
 		KvStateRequestSerializerTest.testListSerialization(key, listState);
@@ -121,7 +123,8 @@ public final class KVStateRequestSerializerRocksDBTest {
 				new ExecutionConfig(),
 				false,
 				TestLocalRecoveryConfig.disabled(),
-				RocksDBStateBackend.PriorityQueueStateType.HEAP);
+				RocksDBStateBackend.PriorityQueueStateType.HEAP,
+				TtlTimeProvider.DEFAULT);
 		longHeapKeyedStateBackend.restore(null);
 		longHeapKeyedStateBackend.setCurrentKey(key);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
index 73f8831..d539066 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -198,11 +199,12 @@ public class KvStateRequestSerializerTest {
 				async,
 				new ExecutionConfig(),
 				TestLocalRecoveryConfig.disabled(),
-				new HeapPriorityQueueSetFactory(keyGroupRange, keyGroupRange.getNumberOfKeyGroups(), 128)
+				new HeapPriorityQueueSetFactory(keyGroupRange, keyGroupRange.getNumberOfKeyGroups(), 128),
+				TtlTimeProvider.DEFAULT
 			);
 		longHeapKeyedStateBackend.setCurrentKey(key);
 
-		final InternalListState<Long, VoidNamespace, Long> listState = longHeapKeyedStateBackend.createState(
+		final InternalListState<Long, VoidNamespace, Long> listState = longHeapKeyedStateBackend.createInternalState(
 			VoidNamespaceSerializer.INSTANCE,
 			new ListStateDescriptor<>("test", LongSerializer.INSTANCE));
 
@@ -306,7 +308,8 @@ public class KvStateRequestSerializerTest {
 				async,
 				new ExecutionConfig(),
 				TestLocalRecoveryConfig.disabled(),
-				new HeapPriorityQueueSetFactory(keyGroupRange, keyGroupRange.getNumberOfKeyGroups(), 128)
+				new HeapPriorityQueueSetFactory(keyGroupRange, keyGroupRange.getNumberOfKeyGroups(), 128),
+				TtlTimeProvider.DEFAULT
 			);
 		longHeapKeyedStateBackend.setCurrentKey(key);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
index 9947dac..adcf3ae 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
@@ -52,6 +52,7 @@ import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
@@ -761,6 +762,7 @@ public class KvStateServerHandlerTest extends TestLogger {
 			IntSerializer.INSTANCE,
 			numKeyGroups,
 			new KeyGroupRange(0, 0),
-			registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()));
+			registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()),
+			TtlTimeProvider.DEFAULT);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index 8ce25b6..c7f1bd9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -27,6 +27,8 @@ import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.runtime.state.ttl.TtlStateFactory;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.Preconditions;
 
@@ -84,6 +86,8 @@ public abstract class AbstractKeyedStateBackend<K> implements
 
 	private final ExecutionConfig executionConfig;
 
+	private final TtlTimeProvider ttlTimeProvider;
+
 	/** Decorates the input and output streams to write key-groups compressed. */
 	protected final StreamCompressionDecorator keyGroupCompressionDecorator;
 
@@ -93,7 +97,8 @@ public abstract class AbstractKeyedStateBackend<K> implements
 		ClassLoader userCodeClassLoader,
 		int numberOfKeyGroups,
 		KeyGroupRange keyGroupRange,
-		ExecutionConfig executionConfig) {
+		ExecutionConfig executionConfig,
+		TtlTimeProvider ttlTimeProvider) {
 
 		this.kvStateRegistry = kvStateRegistry;
 		this.keySerializer = Preconditions.checkNotNull(keySerializer);
@@ -104,6 +109,7 @@ public abstract class AbstractKeyedStateBackend<K> implements
 		this.keyValueStatesByName = new HashMap<>();
 		this.executionConfig = executionConfig;
 		this.keyGroupCompressionDecorator = determineStreamCompression(executionConfig);
+		this.ttlTimeProvider = Preconditions.checkNotNull(ttlTimeProvider);
 	}
 
 	private StreamCompressionDecorator determineStreamCompression(ExecutionConfig executionConfig) {
@@ -220,40 +226,33 @@ public abstract class AbstractKeyedStateBackend<K> implements
 	public <N, S extends State, V> S getOrCreateKeyedState(
 			final TypeSerializer<N> namespaceSerializer,
 			StateDescriptor<S, V> stateDescriptor) throws Exception {
-
 		checkNotNull(namespaceSerializer, "Namespace serializer");
+		checkNotNull(keySerializer, "State key serializer has not been configured in the config. " +
+				"This operation cannot use partitioned state.");
 
-		if (keySerializer == null) {
-			throw new UnsupportedOperationException(
-					"State key serializer has not been configured in the config. " +
-					"This operation cannot use partitioned state.");
-		}
-
-		if (!stateDescriptor.isSerializerInitialized()) {
-			stateDescriptor.initializeSerializerUnlessSet(executionConfig);
-		}
-
-		InternalKvState<K, ?, ?> existing = keyValueStatesByName.get(stateDescriptor.getName());
-		if (existing != null) {
-			@SuppressWarnings("unchecked")
-			S typedState = (S) existing;
-			return typedState;
+		InternalKvState<K, ?, ?> kvState = keyValueStatesByName.get(stateDescriptor.getName());
+		if (kvState == null) {
+			if (!stateDescriptor.isSerializerInitialized()) {
+				stateDescriptor.initializeSerializerUnlessSet(executionConfig);
+			}
+			kvState = TtlStateFactory.createStateAndWrapWithTtlIfEnabled(
+				namespaceSerializer, stateDescriptor, this, ttlTimeProvider);
+			keyValueStatesByName.put(stateDescriptor.getName(), kvState);
+			publishQueryableStateIfEnabled(stateDescriptor, kvState);
 		}
+		return (S) kvState;
+	}
 
-		InternalKvState<K, N, ?> kvState = createState(namespaceSerializer, stateDescriptor);
-		keyValueStatesByName.put(stateDescriptor.getName(), kvState);
-
-		// Publish queryable state
+	private void publishQueryableStateIfEnabled(
+		StateDescriptor<?, ?> stateDescriptor,
+		InternalKvState<?, ?, ?> kvState) {
 		if (stateDescriptor.isQueryable()) {
 			if (kvStateRegistry == null) {
 				throw new IllegalStateException("State backend has not been initialized for job.");
 			}
-
 			String name = stateDescriptor.getQueryableStateName();
 			kvStateRegistry.registerKvState(keyGroupRange, name, kvState);
 		}
-
-		return (S) kvState;
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
index 7e9c357..d397a88 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 
 import java.io.IOException;
 
@@ -42,13 +43,14 @@ public abstract class AbstractStateBackend implements StateBackend, java.io.Seri
 
 	@Override
 	public abstract <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
-			Environment env,
-			JobID jobID,
-			String operatorIdentifier,
-			TypeSerializer<K> keySerializer,
-			int numberOfKeyGroups,
-			KeyGroupRange keyGroupRange,
-			TaskKvStateRegistry kvStateRegistry) throws IOException;
+		Environment env,
+		JobID jobID,
+		String operatorIdentifier,
+		TypeSerializer<K> keySerializer,
+		int numberOfKeyGroups,
+		KeyGroupRange keyGroupRange,
+		TaskKvStateRegistry kvStateRegistry,
+		TtlTimeProvider ttlTimeProvider) throws IOException;
 
 	@Override
 	public abstract OperatorStateBackend createOperatorStateBackend(

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateFactory.java
index dd251bd..de35979 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateFactory.java
@@ -36,7 +36,7 @@ public interface KeyedStateFactory {
 	 * @param <S> The type of the public API state.
 	 * @param <IS> The type of internal state.
 	 */
-	<N, SV, S extends State, IS extends S> IS createState(
+	<N, SV, S extends State, IS extends S> IS createInternalState(
 		TypeSerializer<N> namespaceSerializer,
 		StateDescriptor<S, SV> stateDesc) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
index f34cd9b..2775b71 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 
 import java.io.IOException;
 
@@ -117,7 +118,7 @@ public interface StateBackend extends java.io.Serializable {
 
 	/**
 	 * Creates a new {@link AbstractKeyedStateBackend} that is responsible for holding <b>keyed state</b>
-	 * and checkpointing it.
+	 * and checkpointing it. Uses default TTL time provider.
 	 *
 	 * <p><i>Keyed State</i> is state where each value is bound to a key.
 	 *
@@ -127,14 +128,46 @@ public interface StateBackend extends java.io.Serializable {
 	 *
 	 * @throws Exception This method may forward all exceptions that occur while instantiating the backend.
 	 */
-	<K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
+	default <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
 			Environment env,
 			JobID jobID,
 			String operatorIdentifier,
 			TypeSerializer<K> keySerializer,
 			int numberOfKeyGroups,
 			KeyGroupRange keyGroupRange,
-			TaskKvStateRegistry kvStateRegistry) throws Exception;
+			TaskKvStateRegistry kvStateRegistry) throws Exception {
+		return createKeyedStateBackend(
+			env,
+			jobID,
+			operatorIdentifier,
+			keySerializer,
+			numberOfKeyGroups,
+			keyGroupRange,
+			kvStateRegistry,
+			TtlTimeProvider.DEFAULT);
+	}
+
+	/**
+	 * Creates a new {@link AbstractKeyedStateBackend} that is responsible for holding <b>keyed state</b>
+	 * and checkpointing it.
+	 *
+	 * <p><i>Keyed State</i> is state where each value is bound to a key.
+	 *
+	 * @param <K> The type of the keys by which the state is organized.
+	 *
+	 * @return The Keyed State Backend for the given job, operator, and key group range.
+	 *
+	 * @throws Exception This method may forward all exceptions that occur while instantiating the backend.
+	 */
+	<K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
+		Environment env,
+		JobID jobID,
+		String operatorIdentifier,
+		TypeSerializer<K> keySerializer,
+		int numberOfKeyGroups,
+		KeyGroupRange keyGroupRange,
+		TaskKvStateRegistry kvStateRegistry,
+		TtlTimeProvider ttlTimeProvider) throws Exception;
 	
 	/**
 	 * Creates a new {@link OperatorStateBackend} that can be used for storing operator state.

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index ad1581b..f5a86e1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.TaskStateManager;
 import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
 import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.util.TernaryBoolean;
 
 import org.slf4j.LoggerFactory;
@@ -93,7 +94,7 @@ public class FsStateBackend extends AbstractFileStateBackend implements Configur
 	private static final long serialVersionUID = -8191916350224044011L;
 
 	/** Maximum size of state that is stored with the metadata, rather than in files (1 MiByte). */
-	public static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024;
+	private static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024;
 
 	// ------------------------------------------------------------------------
 
@@ -448,13 +449,14 @@ public class FsStateBackend extends AbstractFileStateBackend implements Configur
 
 	@Override
 	public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
-			Environment env,
-			JobID jobID,
-			String operatorIdentifier,
-			TypeSerializer<K> keySerializer,
-			int numberOfKeyGroups,
-			KeyGroupRange keyGroupRange,
-			TaskKvStateRegistry kvStateRegistry) {
+		Environment env,
+		JobID jobID,
+		String operatorIdentifier,
+		TypeSerializer<K> keySerializer,
+		int numberOfKeyGroups,
+		KeyGroupRange keyGroupRange,
+		TaskKvStateRegistry kvStateRegistry,
+		TtlTimeProvider ttlTimeProvider) {
 
 		TaskStateManager taskStateManager = env.getTaskStateManager();
 		LocalRecoveryConfig localRecoveryConfig = taskStateManager.createLocalRecoveryConfig();
@@ -470,7 +472,8 @@ public class FsStateBackend extends AbstractFileStateBackend implements Configur
 				isUsingAsynchronousSnapshots(),
 				env.getExecutionConfig(),
 				localRecoveryConfig,
-				priorityQueueSetFactory);
+				priorityQueueSetFactory,
+				ttlTimeProvider);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 562c93d..495dfe0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -63,6 +63,7 @@ import org.apache.flink.runtime.state.StateSnapshot;
 import org.apache.flink.runtime.state.StreamCompressionDecorator;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
@@ -171,9 +172,11 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			boolean asynchronousSnapshots,
 			ExecutionConfig executionConfig,
 			LocalRecoveryConfig localRecoveryConfig,
-			PriorityQueueSetFactory priorityQueueSetFactory) {
+			PriorityQueueSetFactory priorityQueueSetFactory,
+			TtlTimeProvider ttlTimeProvider) {
 
-		super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange, executionConfig);
+		super(kvStateRegistry, keySerializer, userCodeClassLoader,
+			numberOfKeyGroups, keyGroupRange, executionConfig, ttlTimeProvider);
 		this.localRecoveryConfig = Preconditions.checkNotNull(localRecoveryConfig);
 
 		SnapshotStrategySynchronicityBehavior<K> synchronicityTrait = asynchronousSnapshots ?
@@ -241,7 +244,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	}
 
 	@Override
-	public <N, SV, S extends State, IS extends S> IS createState(
+	public <N, SV, S extends State, IS extends S> IS createInternalState(
 		TypeSerializer<N> namespaceSerializer,
 		StateDescriptor<S, SV> stateDesc) throws Exception {
 		StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass());

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
index d78944c..1c464d7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.state.TaskStateManager;
 import org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend;
 import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
 import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.util.TernaryBoolean;
 
 import javax.annotation.Nullable;
@@ -307,7 +308,8 @@ public class MemoryStateBackend extends AbstractFileStateBackend implements Conf
 			TypeSerializer<K> keySerializer,
 			int numberOfKeyGroups,
 			KeyGroupRange keyGroupRange,
-			TaskKvStateRegistry kvStateRegistry) {
+			TaskKvStateRegistry kvStateRegistry,
+			TtlTimeProvider ttlTimeProvider) {
 
 		TaskStateManager taskStateManager = env.getTaskStateManager();
 		HeapPriorityQueueSetFactory priorityQueueSetFactory =
@@ -321,7 +323,8 @@ public class MemoryStateBackend extends AbstractFileStateBackend implements Conf
 				isUsingAsynchronousSnapshots(),
 				env.getExecutionConfig(),
 				taskStateManager.createLocalRecoveryConfig(),
-				priorityQueueSetFactory);
+				priorityQueueSetFactory,
+				ttlTimeProvider);
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
index 29a575a..1b72c54 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
@@ -55,8 +55,6 @@ abstract class AbstractTtlDecorator<T> {
 		Preconditions.checkNotNull(original);
 		Preconditions.checkNotNull(config);
 		Preconditions.checkNotNull(timeProvider);
-		Preconditions.checkArgument(config.getTtlUpdateType() != StateTtlConfiguration.TtlUpdateType.Disabled,
-			"State does not need to be wrapped with TTL if it is configured as disabled.");
 		this.original = original;
 		this.config = config;
 		this.timeProvider = timeProvider;

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
index 98d7c52..21145e5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
@@ -23,13 +23,15 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.internal.InternalMapState;
 import org.apache.flink.util.FlinkRuntimeException;
 
+import javax.annotation.Nonnull;
+
 import java.util.AbstractMap;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.stream.Stream;
-import java.util.stream.StreamSupport;
+import java.util.NoSuchElementException;
+import java.util.function.Function;
 
 /**
  * This class wraps map state with TTL logic.
@@ -84,52 +86,89 @@ class TtlMapState<K, N, UK, UV>
 
 	@Override
 	public Iterable<Map.Entry<UK, UV>> entries() throws Exception {
-		return entriesStream()::iterator;
+		return entries(e -> e);
 	}
 
-	private Stream<Map.Entry<UK, UV>> entriesStream() throws Exception {
+	private <R> Iterable<R> entries(
+		Function<Map.Entry<UK, UV>, R> resultMapper) throws Exception {
 		Iterable<Map.Entry<UK, TtlValue<UV>>> withTs = original.entries();
-		withTs = withTs == null ? Collections.emptyList() : withTs;
-		return StreamSupport
-			.stream(withTs.spliterator(), false)
-			.filter(this::unexpiredAndUpdateOrCleanup)
-			.map(TtlMapState::unwrapWithoutTs);
-	}
-
-	private boolean unexpiredAndUpdateOrCleanup(Map.Entry<UK, TtlValue<UV>> e) {
-		UV unexpiredValue;
-		try {
-			unexpiredValue = getWithTtlCheckAndUpdate(
-				e::getValue,
-				v -> original.put(e.getKey(), v),
-				() -> original.remove(e.getKey()));
-		} catch (Exception ex) {
-			throw new FlinkRuntimeException(ex);
-		}
-		return unexpiredValue != null;
-	}
-
-	private static <UK, UV> Map.Entry<UK, UV> unwrapWithoutTs(Map.Entry<UK, TtlValue<UV>> e) {
-		return new AbstractMap.SimpleEntry<>(e.getKey(), e.getValue().getUserValue());
+		return () -> new EntriesIterator<>(withTs == null ? Collections.emptyList() : withTs, resultMapper);
 	}
 
 	@Override
 	public Iterable<UK> keys() throws Exception {
-		return entriesStream().map(Map.Entry::getKey)::iterator;
+		return entries(Map.Entry::getKey);
 	}
 
 	@Override
 	public Iterable<UV> values() throws Exception {
-		return entriesStream().map(Map.Entry::getValue)::iterator;
+		return entries(Map.Entry::getValue);
 	}
 
 	@Override
 	public Iterator<Map.Entry<UK, UV>> iterator() throws Exception {
-		return entriesStream().iterator();
+		return entries().iterator();
 	}
 
 	@Override
 	public void clear() {
 		original.clear();
 	}
+
+	private class EntriesIterator<R> implements Iterator<R> {
+		private final Iterator<Map.Entry<UK, TtlValue<UV>>> originalIterator;
+		private final Function<Map.Entry<UK, UV>, R> resultMapper;
+		private Map.Entry<UK, UV> nextUnexpired = null;
+		private boolean rightAfterNextIsCalled = false;
+
+		private EntriesIterator(
+			@Nonnull Iterable<Map.Entry<UK, TtlValue<UV>>> withTs,
+			@Nonnull Function<Map.Entry<UK, UV>, R> resultMapper) {
+			this.originalIterator = withTs.iterator();
+			this.resultMapper = resultMapper;
+		}
+
+		@Override
+		public boolean hasNext() {
+			rightAfterNextIsCalled = false;
+			while (nextUnexpired == null && originalIterator.hasNext()) {
+				nextUnexpired = getUnexpiredAndUpdateOrCleanup(originalIterator.next());
+			}
+			return nextUnexpired != null;
+		}
+
+		@Override
+		public R next() {
+			if (hasNext()) {
+				rightAfterNextIsCalled = true;
+				R result = resultMapper.apply(nextUnexpired);
+				nextUnexpired = null;
+				return result;
+			}
+			throw new NoSuchElementException();
+		}
+
+		@Override
+		public void remove() {
+			if (rightAfterNextIsCalled) {
+				originalIterator.remove();
+			} else {
+				throw new IllegalStateException("next() has not been called or hasNext() has been called afterwards," +
+					" remove() is supported only right after calling next()");
+			}
+		}
+
+		private Map.Entry<UK, UV> getUnexpiredAndUpdateOrCleanup(Map.Entry<UK, TtlValue<UV>> e) {
+			UV unexpiredValue;
+			try {
+				unexpiredValue = getWithTtlCheckAndUpdate(
+					e::getValue,
+					v -> original.put(e.getKey(), v),
+					originalIterator::remove);
+			} catch (Exception ex) {
+				throw new FlinkRuntimeException(ex);
+			}
+			return unexpiredValue == null ? null : new AbstractMap.SimpleEntry<>(e.getKey(), unexpiredValue);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReducingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReducingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReducingState.java
index 01e4be9..c0aa465 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReducingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReducingState.java
@@ -49,7 +49,7 @@ class TtlReducingState<K, N, T>
 
 	@Override
 	public void add(T value) throws Exception {
-		original.add(wrapWithTs(value, Long.MAX_VALUE));
+		original.add(wrapWithTs(value));
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
index 82096a6..5909ac7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
@@ -49,16 +49,14 @@ public class TtlStateFactory {
 		TypeSerializer<N> namespaceSerializer,
 		StateDescriptor<S, SV> stateDesc,
 		KeyedStateFactory originalStateFactory,
-		StateTtlConfiguration ttlConfig,
 		TtlTimeProvider timeProvider) throws Exception {
 		Preconditions.checkNotNull(namespaceSerializer);
 		Preconditions.checkNotNull(stateDesc);
 		Preconditions.checkNotNull(originalStateFactory);
-		Preconditions.checkNotNull(ttlConfig);
 		Preconditions.checkNotNull(timeProvider);
-		return ttlConfig.getTtlUpdateType() == StateTtlConfiguration.TtlUpdateType.Disabled ?
-			originalStateFactory.createState(namespaceSerializer, stateDesc) :
-			new TtlStateFactory(originalStateFactory, ttlConfig, timeProvider)
+		return stateDesc.getTtlConfig() == null ?
+			originalStateFactory.createInternalState(namespaceSerializer, stateDesc) :
+			new TtlStateFactory(originalStateFactory, stateDesc.getTtlConfig(), timeProvider)
 				.createState(namespaceSerializer, stateDesc);
 	}
 
@@ -96,7 +94,7 @@ public class TtlStateFactory {
 				stateDesc.getClass(), TtlStateFactory.class);
 			throw new FlinkRuntimeException(message);
 		}
-		return stateFactory.createState(namespaceSerializer, stateDesc);
+		return stateFactory.createInternalState(namespaceSerializer, stateDesc);
 	}
 
 	@SuppressWarnings("unchecked")
@@ -106,7 +104,7 @@ public class TtlStateFactory {
 		ValueStateDescriptor<TtlValue<SV>> ttlDescriptor = new ValueStateDescriptor<>(
 			stateDesc.getName(), new TtlSerializer<>(stateDesc.getSerializer()));
 		return (IS) new TtlValueState<>(
-			originalStateFactory.createState(namespaceSerializer, ttlDescriptor),
+			originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor),
 			ttlConfig, timeProvider, stateDesc.getSerializer());
 	}
 
@@ -118,7 +116,7 @@ public class TtlStateFactory {
 		ListStateDescriptor<TtlValue<T>> ttlDescriptor = new ListStateDescriptor<>(
 			stateDesc.getName(), new TtlSerializer<>(listStateDesc.getElementSerializer()));
 		return (IS) new TtlListState<>(
-			originalStateFactory.createState(namespaceSerializer, ttlDescriptor),
+			originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor),
 			ttlConfig, timeProvider, listStateDesc.getSerializer());
 	}
 
@@ -132,7 +130,7 @@ public class TtlStateFactory {
 			mapStateDesc.getKeySerializer(),
 			new TtlSerializer<>(mapStateDesc.getValueSerializer()));
 		return (IS) new TtlMapState<>(
-			originalStateFactory.createState(namespaceSerializer, ttlDescriptor),
+			originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor),
 			ttlConfig, timeProvider, mapStateDesc.getSerializer());
 	}
 
@@ -146,7 +144,7 @@ public class TtlStateFactory {
 			new TtlReduceFunction<>(reducingStateDesc.getReduceFunction(), ttlConfig, timeProvider),
 			new TtlSerializer<>(stateDesc.getSerializer()));
 		return (IS) new TtlReducingState<>(
-			originalStateFactory.createState(namespaceSerializer, ttlDescriptor),
+			originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor),
 			ttlConfig, timeProvider, stateDesc.getSerializer());
 	}
 
@@ -161,7 +159,7 @@ public class TtlStateFactory {
 		AggregatingStateDescriptor<IN, TtlValue<SV>, OUT> ttlDescriptor = new AggregatingStateDescriptor<>(
 			stateDesc.getName(), ttlAggregateFunction, new TtlSerializer<>(stateDesc.getSerializer()));
 		return (IS) new TtlAggregatingState<>(
-			originalStateFactory.createState(namespaceSerializer, ttlDescriptor),
+			originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor),
 			ttlConfig, timeProvider, stateDesc.getSerializer(), ttlAggregateFunction);
 	}
 
@@ -178,7 +176,7 @@ public class TtlStateFactory {
 			new TtlFoldFunction<>(foldingStateDescriptor.getFoldFunction(), ttlConfig, timeProvider, initAcc),
 			new TtlSerializer<>(stateDesc.getSerializer()));
 		return (IS) new TtlFoldingState<>(
-			originalStateFactory.createState(namespaceSerializer, ttlDescriptor),
+			originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor),
 			ttlConfig, timeProvider, stateDesc.getSerializer());
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlTimeProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlTimeProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlTimeProvider.java
index bac9d36..84809cc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlTimeProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlTimeProvider.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.state.ttl;
 /**
  * Provides time to TTL logic to judge about state expiration.
  */
-interface TtlTimeProvider {
+public interface TtlTimeProvider {
+	TtlTimeProvider DEFAULT = System::currentTimeMillis;
+
 	long currentTimestamp();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
index 9456f10..ffebc52 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
@@ -168,7 +169,8 @@ public class CheckpointSettingsSerializableTest extends TestLogger {
 			TypeSerializer<K> keySerializer,
 			int numberOfKeyGroups,
 			KeyGroupRange keyGroupRange,
-			TaskKvStateRegistry kvStateRegistry) throws Exception {
+			TaskKvStateRegistry kvStateRegistry,
+			TtlTimeProvider ttlTimeProvider) throws Exception {
 			throw new UnsupportedOperationException();
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/HeapKeyedStateBackendAsyncByDefaultTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/HeapKeyedStateBackendAsyncByDefaultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/HeapKeyedStateBackendAsyncByDefaultTest.java
index 1325431..6424a7a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/HeapKeyedStateBackendAsyncByDefaultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/HeapKeyedStateBackendAsyncByDefaultTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.util.IOUtils;
 
 import org.junit.Rule;
@@ -70,7 +71,8 @@ public class HeapKeyedStateBackendAsyncByDefaultTest {
 			IntSerializer.INSTANCE,
 			1,
 			new KeyGroupRange(0, 0),
-			null
+			null,
+			TtlTimeProvider.DEFAULT
 		);
 
 		assertTrue(keyedStateBackend.supportsAsynchronousSnapshots());

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 3c5756b..bfdc05d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -73,6 +73,7 @@ import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.runtime.state.internal.InternalReducingState;
 import org.apache.flink.runtime.state.internal.InternalValueState;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
 import org.apache.flink.testutils.ArtificialCNFExceptionThrowingClassLoader;
 import org.apache.flink.types.IntValue;
@@ -182,7 +183,8 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 				keySerializer,
 				numberOfKeyGroups,
 				keyGroupRange,
-				env.getTaskKvStateRegistry());
+				env.getTaskKvStateRegistry(),
+			    TtlTimeProvider.DEFAULT);
 
 		backend.restore(null);
 
@@ -219,7 +221,8 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			keySerializer,
 			numberOfKeyGroups,
 			keyGroupRange,
-			env.getTaskKvStateRegistry());
+			env.getTaskKvStateRegistry(),
+			TtlTimeProvider.DEFAULT);
 
 		backend.restore(new StateObjectCollection<>(state));
 
@@ -3545,7 +3548,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			}
 
 			// insert some data to the backend.
-			InternalValueState<Integer, VoidNamespace, Integer> valueState = backend.createState(
+			InternalValueState<Integer, VoidNamespace, Integer> valueState = backend.createInternalState(
 				VoidNamespaceSerializer.INSTANCE,
 				new ValueStateDescriptor<>("test", IntSerializer.INSTANCE));
 
@@ -3602,7 +3605,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 
 		try {
 			backend = createKeyedBackend(IntSerializer.INSTANCE);
-			InternalValueState<Integer, VoidNamespace, Integer> valueState = backend.createState(
+			InternalValueState<Integer, VoidNamespace, Integer> valueState = backend.createInternalState(
 					VoidNamespaceSerializer.INSTANCE,
 					new ValueStateDescriptor<>("test", IntSerializer.INSTANCE));
 
@@ -3649,7 +3652,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		try {
 			backend = restoreKeyedBackend(IntSerializer.INSTANCE, stateHandle);
 
-			InternalValueState<Integer, VoidNamespace, Integer> valueState = backend.createState(
+			InternalValueState<Integer, VoidNamespace, Integer> valueState = backend.createInternalState(
 					VoidNamespaceSerializer.INSTANCE,
 					new ValueStateDescriptor<>("test", IntSerializer.INSTANCE));
 
@@ -3791,7 +3794,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 				return;
 			}
 
-			InternalValueState<Integer, VoidNamespace, Integer> valueState = backend.createState(
+			InternalValueState<Integer, VoidNamespace, Integer> valueState = backend.createInternalState(
 					VoidNamespaceSerializer.INSTANCE,
 					new ValueStateDescriptor<>("test", IntSerializer.INSTANCE));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
index dfcdffc..558f629 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
 import org.apache.flink.runtime.state.internal.InternalValueState;
 import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.commons.io.IOUtils;
@@ -54,7 +55,8 @@ public class StateSnapshotCompressionTest extends TestLogger {
 			true,
 			executionConfig,
 			TestLocalRecoveryConfig.disabled(),
-			mock(PriorityQueueSetFactory.class));
+			mock(PriorityQueueSetFactory.class),
+			TtlTimeProvider.DEFAULT);
 
 		try {
 			Assert.assertTrue(
@@ -77,7 +79,8 @@ public class StateSnapshotCompressionTest extends TestLogger {
 			true,
 			executionConfig,
 			TestLocalRecoveryConfig.disabled(),
-			mock(PriorityQueueSetFactory.class));
+			mock(PriorityQueueSetFactory.class),
+			TtlTimeProvider.DEFAULT);
 
 		try {
 			Assert.assertTrue(
@@ -118,12 +121,13 @@ public class StateSnapshotCompressionTest extends TestLogger {
 			true,
 			executionConfig,
 			TestLocalRecoveryConfig.disabled(),
-			mock(PriorityQueueSetFactory.class));
+			mock(PriorityQueueSetFactory.class),
+			TtlTimeProvider.DEFAULT);
 
 		try {
 
 			InternalValueState<String, VoidNamespace, String> state =
-				stateBackend.createState(new VoidNamespaceSerializer(), stateDescriptor);
+				stateBackend.createInternalState(new VoidNamespaceSerializer(), stateDescriptor);
 
 			stateBackend.setCurrentKey("A");
 			state.setCurrentNamespace(VoidNamespace.INSTANCE);
@@ -160,12 +164,13 @@ public class StateSnapshotCompressionTest extends TestLogger {
 			true,
 			executionConfig,
 			TestLocalRecoveryConfig.disabled(),
-			mock(PriorityQueueSetFactory.class));
+			mock(PriorityQueueSetFactory.class),
+			TtlTimeProvider.DEFAULT);
 		try {
 
 			stateBackend.restore(StateObjectCollection.singleton(stateHandle));
 
-			InternalValueState<String, VoidNamespace, String> state = stateBackend.createState(
+			InternalValueState<String, VoidNamespace, String> state = stateBackend.createInternalState(
 				new VoidNamespaceSerializer(),
 				stateDescriptor);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
index 249d0c3..7b8d69f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
@@ -73,7 +73,7 @@ public class HeapKeyedStateBackendSnapshotMigrationTest extends HeapStateBackend
 
 			keyedBackend.restore(StateObjectCollection.singleton(stateHandles.getJobManagerOwnedSnapshot()));
 
-			InternalMapState<String, Integer, Long, Long> state = keyedBackend.createState(IntSerializer.INSTANCE, stateDescr);
+			InternalMapState<String, Integer, Long, Long> state = keyedBackend.createInternalState(IntSerializer.INSTANCE, stateDescr);
 
 			keyedBackend.setCurrentKey("abc");
 			state.setCurrentNamespace(namespace1);
@@ -233,7 +233,7 @@ public class HeapKeyedStateBackendSnapshotMigrationTest extends HeapStateBackend
 			final ListStateDescriptor<Long> stateDescr = new ListStateDescriptor<>("my-state", Long.class);
 			stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
 
-			InternalListState<String, Integer, Long> state = keyedBackend.createState(IntSerializer.INSTANCE, stateDescr);
+			InternalListState<String, Integer, Long> state = keyedBackend.createInternalState(IntSerializer.INSTANCE, stateDescr);
 
 			assertEquals(7, keyedBackend.numStateEntries());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
index cf6aef4..0eddf3c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -61,6 +62,7 @@ public abstract class HeapStateBackendTestBase {
 			async,
 			new ExecutionConfig(),
 			TestLocalRecoveryConfig.disabled(),
-			new HeapPriorityQueueSetFactory(keyGroupRange, numKeyGroups, 128));
+			new HeapPriorityQueueSetFactory(keyGroupRange, numKeyGroups, 128),
+			TtlTimeProvider.DEFAULT);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/HeapTtlStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/HeapTtlStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/HeapTtlStateTest.java
new file mode 100644
index 0000000..06de4be
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/HeapTtlStateTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+
+/** Test suite for heap state TTL. */
+public class HeapTtlStateTest extends TtlStateTestBase {
+	@Override
+	protected StateBackendTestContext createStateBackendTestContext(TtlTimeProvider timeProvider) {
+		return new StateBackendTestContext(timeProvider) {
+			@Override
+			protected StateBackend createStateBackend() {
+				return new MemoryStateBackend(false);
+			}
+		};
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTimeProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTimeProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTimeProvider.java
deleted file mode 100644
index e14c3f8..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTimeProvider.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.ttl;
-
-class MockTimeProvider implements TtlTimeProvider {
-	long time = 0;
-
-	@Override
-	public long currentTimestamp() {
-		return time;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTtlStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTtlStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTtlStateTest.java
new file mode 100644
index 0000000..392bdba
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTtlStateTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.ttl.mock.MockStateBackend;
+
+/** Test suite for mock state TTL. */
+public class MockTtlStateTest extends TtlStateTestBase {
+	@Override
+	protected StateBackendTestContext createStateBackendTestContext(TtlTimeProvider timeProvider) {
+		return new StateBackendTestContext(timeProvider) {
+			@Override
+			protected StateBackend createStateBackend() {
+				return new MockStateBackend();
+			}
+		};
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTtlTimeProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTtlTimeProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTtlTimeProvider.java
new file mode 100644
index 0000000..f980043
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTtlTimeProvider.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+class MockTtlTimeProvider implements TtlTimeProvider {
+	long time = 0;
+
+	@Override
+	public long currentTimestamp() {
+		return time;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java
new file mode 100644
index 0000000..eaec234
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.StateObjectCollection;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStorageLocation;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.RunnableFuture;
+
+/** Base class for state backend test context. */
+public abstract class StateBackendTestContext {
+	private final StateBackend stateBackend;
+	private final CheckpointStorageLocation checkpointStorageLocation;
+	private final TtlTimeProvider timeProvider;
+
+	private AbstractKeyedStateBackend<String> keyedStateBackend;
+
+	protected StateBackendTestContext(TtlTimeProvider timeProvider) {
+		this.timeProvider = Preconditions.checkNotNull(timeProvider);
+		this.stateBackend = Preconditions.checkNotNull(createStateBackend());
+		this.checkpointStorageLocation = createCheckpointStorageLocation();
+	}
+
+	protected abstract StateBackend createStateBackend();
+
+	private CheckpointStorageLocation createCheckpointStorageLocation() {
+		try {
+			return stateBackend
+				.createCheckpointStorage(new JobID())
+				.initializeLocationForCheckpoint(2L);
+		} catch (IOException e) {
+			throw new RuntimeException("unexpected");
+		}
+	}
+
+	void createAndRestoreKeyedStateBackend() {
+		Environment env = new DummyEnvironment();
+		try {
+			disposeKeyedStateBackend();
+			keyedStateBackend = stateBackend.createKeyedStateBackend(
+				env, new JobID(), "test", StringSerializer.INSTANCE, 10,
+				new KeyGroupRange(0, 9), env.getTaskKvStateRegistry(), timeProvider);
+			keyedStateBackend.setCurrentKey("defaultKey");
+		} catch (Exception e) {
+			throw new RuntimeException("unexpected");
+		}
+	}
+
+	void disposeKeyedStateBackend() {
+		if (keyedStateBackend != null) {
+			keyedStateBackend.dispose();
+			keyedStateBackend = null;
+		}
+	}
+
+	@Nonnull
+	KeyedStateHandle takeSnapshot() throws Exception {
+		RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshotRunnableFuture =
+			keyedStateBackend.snapshot(682375462392L, 10L,
+				checkpointStorageLocation, CheckpointOptions.forCheckpointWithDefaultLocation());
+		if (!snapshotRunnableFuture.isDone()) {
+			snapshotRunnableFuture.run();
+		}
+		return snapshotRunnableFuture.get().getJobManagerOwnedSnapshot();
+	}
+
+	void restoreSnapshot(@Nullable KeyedStateHandle snapshot) throws Exception {
+		Collection<KeyedStateHandle> restoreState =
+			snapshot == null ? null : new StateObjectCollection<>(Collections.singleton(snapshot));
+		keyedStateBackend.restore(restoreState);
+		if (snapshot != null) {
+			snapshot.discardState();
+		}
+	}
+
+	void setCurrentKey(String key) {
+		Preconditions.checkNotNull(keyedStateBackend, "keyed backend is not initialised");
+		keyedStateBackend.setCurrentKey(key);
+	}
+
+	@SuppressWarnings("unchecked")
+	<N, S extends State, V> S createState(
+		StateDescriptor<S, V> stateDescriptor,
+		@SuppressWarnings("SameParameterValue") N defaultNamespace) throws Exception {
+		S state = keyedStateBackend.getOrCreateKeyedState(StringSerializer.INSTANCE, stateDescriptor);
+		((InternalKvState<?, N, ?>) state).setCurrentNamespace(defaultNamespace);
+		return state;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlAggregatingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlAggregatingStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlAggregatingStateTest.java
deleted file mode 100644
index 5d9c682..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlAggregatingStateTest.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.ttl;
-
-import org.apache.flink.api.common.functions.AggregateFunction;
-import org.apache.flink.api.common.state.AggregatingStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-/** Test suite for {@link TtlAggregatingState}. */
-public class TtlAggregatingStateTest
-	extends TtlMergingStateBase.TtlIntegerMergingStateBase<TtlAggregatingState<?, String, Integer, Long, String>, Integer, String> {
-	private static final long DEFAULT_ACCUMULATOR = 3L;
-
-	@Override
-	void initTestValues() {
-		updater = v -> ttlState.add(v);
-		getter = () -> ttlState.get();
-		originalGetter = () -> ttlState.original.get();
-
-		updateEmpty = 5;
-		updateUnexpired = 7;
-		updateExpired = 6;
-
-		getUpdateEmpty = "8";
-		getUnexpired = "15";
-		getUpdateExpired = "9";
-	}
-
-	@Override
-	TtlAggregatingState<?, String, Integer, Long, String> createState() {
-		AggregatingStateDescriptor<Integer, Long, String> aggregatingStateDes =
-			new AggregatingStateDescriptor<>("TtlTestAggregatingState", AGGREGATE, LongSerializer.INSTANCE);
-		return (TtlAggregatingState<?, String, Integer, Long, String>) wrapMockState(aggregatingStateDes);
-	}
-
-	@Override
-	String getMergeResult(
-		List<Tuple2<String, Integer>> unexpiredUpdatesToMerge,
-		List<Tuple2<String, Integer>> finalUpdatesToMerge) {
-		Set<String> namespaces = new HashSet<>();
-		unexpiredUpdatesToMerge.forEach(t -> namespaces.add(t.f0));
-		finalUpdatesToMerge.forEach(t -> namespaces.add(t.f0));
-		return Integer.toString(getIntegerMergeResult(unexpiredUpdatesToMerge, finalUpdatesToMerge) +
-			namespaces.size() * (int) DEFAULT_ACCUMULATOR);
-	}
-
-	private static final AggregateFunction<Integer, Long, String> AGGREGATE =
-		new AggregateFunction<Integer, Long, String>() {
-			@Override
-			public Long createAccumulator() {
-				return DEFAULT_ACCUMULATOR;
-			}
-
-			@Override
-			public Long add(Integer value, Long accumulator) {
-				return accumulator + value;
-			}
-
-			@Override
-			public String getResult(Long accumulator) {
-				return accumulator.toString();
-			}
-
-			@Override
-			public Long merge(Long a, Long b) {
-				return a + b;
-			}
-		};
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlAggregatingStateTestContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlAggregatingStateTestContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlAggregatingStateTestContext.java
new file mode 100644
index 0000000..b19391a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlAggregatingStateTestContext.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/** Test suite for {@link TtlAggregatingState}. */
+class TtlAggregatingStateTestContext
+	extends TtlMergingStateTestContext.TtlIntegerMergingStateTestContext<TtlAggregatingState<?, String, Integer, Long, String>, Integer, String> {
+	private static final long DEFAULT_ACCUMULATOR = 3L;
+
+	@Override
+	void initTestValues() {
+		updateEmpty = 5;
+		updateUnexpired = 7;
+		updateExpired = 6;
+
+		getUpdateEmpty = "8";
+		getUnexpired = "15";
+		getUpdateExpired = "9";
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	<US extends State, SV> StateDescriptor<US, SV> createStateDescriptor() {
+		return (StateDescriptor<US, SV>) new AggregatingStateDescriptor<>(
+			"TtlTestAggregatingState", AGGREGATE, LongSerializer.INSTANCE);
+	}
+
+	@Override
+	void update(Integer value) throws Exception {
+		ttlState.add(value);
+	}
+
+	@Override
+	String get() throws Exception {
+		return ttlState.get();
+	}
+
+	@Override
+	Object getOriginal() throws Exception {
+		return ttlState.original.get();
+	}
+
+	@Override
+	String getMergeResult(
+		List<Tuple2<String, Integer>> unexpiredUpdatesToMerge,
+		List<Tuple2<String, Integer>> finalUpdatesToMerge) {
+		Set<String> namespaces = new HashSet<>();
+		unexpiredUpdatesToMerge.forEach(t -> namespaces.add(t.f0));
+		finalUpdatesToMerge.forEach(t -> namespaces.add(t.f0));
+		return Integer.toString(getIntegerMergeResult(unexpiredUpdatesToMerge, finalUpdatesToMerge) +
+			namespaces.size() * (int) DEFAULT_ACCUMULATOR);
+	}
+
+	private static final AggregateFunction<Integer, Long, String> AGGREGATE =
+		new AggregateFunction<Integer, Long, String>() {
+			@Override
+			public Long createAccumulator() {
+				return DEFAULT_ACCUMULATOR;
+			}
+
+			@Override
+			public Long add(Integer value, Long accumulator) {
+				return accumulator + value;
+			}
+
+			@Override
+			public String getResult(Long accumulator) {
+				return accumulator.toString();
+			}
+
+			@Override
+			public Long merge(Long a, Long b) {
+				return a + b;
+			}
+		};
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlFoldingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlFoldingStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlFoldingStateTest.java
deleted file mode 100644
index 8dac8ca..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlFoldingStateTest.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.ttl;
-
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.state.FoldingStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-
-/** Test suite for {@link TtlFoldingState}. */
-@SuppressWarnings("deprecation")
-public class TtlFoldingStateTest extends TtlStateTestBase<TtlFoldingState<?, String, Long, String>, Long, String> {
-	@Override
-	void initTestValues() {
-		updater = v -> ttlState.add(v);
-		getter = () -> ttlState.get();
-		originalGetter = () -> ttlState.original.get();
-
-		updateEmpty = 5L;
-		updateUnexpired = 7L;
-		updateExpired = 6L;
-
-		getUpdateEmpty = "6";
-		getUnexpired = "13";
-		getUpdateExpired = "7";
-	}
-
-	@Override
-	TtlFoldingState<?, String, Long, String> createState() {
-		FoldingStateDescriptor<Long, String> foldingStateDesc =
-			new FoldingStateDescriptor<>("TtlTestFoldingState", "1",  FOLD, StringSerializer.INSTANCE);
-		return (TtlFoldingState<?, String, Long, String>) wrapMockState(foldingStateDesc);
-	}
-
-	private static final FoldFunction<Long, String> FOLD = (acc, val) -> {
-		long lacc = acc == null ? 0 : Long.parseLong(acc);
-		return Long.toString(val == null ? lacc : lacc + val);
-	};
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlFoldingStateTestContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlFoldingStateTestContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlFoldingStateTestContext.java
new file mode 100644
index 0000000..2b072b9
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlFoldingStateTestContext.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+
+/** Test suite for {@link TtlFoldingState}. */
+@SuppressWarnings("deprecation")
+class TtlFoldingStateTestContext extends TtlStateTestContextBase<TtlFoldingState<?, String, Long, String>, Long, String> {
+	@Override
+	void initTestValues() {
+		updateEmpty = 5L;
+		updateUnexpired = 7L;
+		updateExpired = 6L;
+
+		getUpdateEmpty = "6";
+		getUnexpired = "13";
+		getUpdateExpired = "7";
+	}
+
+	@Override
+	void update(Long value) throws Exception {
+		ttlState.add(value);
+	}
+
+	@Override
+	String get() throws Exception {
+		return ttlState.get();
+	}
+
+	@Override
+	Object getOriginal() throws Exception {
+		return ttlState.original.get();
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	<US extends State, SV> StateDescriptor<US, SV> createStateDescriptor() {
+		return (StateDescriptor<US, SV>) new FoldingStateDescriptor<>(
+			"TtlTestFoldingState", "1",  FOLD, StringSerializer.INSTANCE);
+	}
+
+	private static final FoldFunction<Long, String> FOLD = (acc, val) -> {
+		long lacc = acc == null ? 0 : Long.parseLong(acc);
+		return Long.toString(val == null ? lacc : lacc + val);
+	};
+}