You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/07/12 18:19:49 UTC

[2/3] flink git commit: [FLINK-9701] Introduce TTL configuration in state descriptors

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlListStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlListStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlListStateTest.java
deleted file mode 100644
index 893f9ae..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlListStateTest.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.ttl;
-
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-import java.util.stream.StreamSupport;
-
-/** Test suite for {@link TtlListState}. */
-public class TtlListStateTest
-	extends TtlMergingStateBase<TtlListState<?, String, Integer>, List<Integer>, Iterable<Integer>> {
-	@Override
-	void initTestValues() {
-		updater = v -> ttlState.addAll(v);
-		getter = () -> StreamSupport.stream(ttlState.get().spliterator(), false).collect(Collectors.toList());
-		originalGetter = () -> ttlState.original.get();
-
-		emptyValue = Collections.emptyList();
-
-		updateEmpty = Arrays.asList(5, 7, 10);
-		updateUnexpired = Arrays.asList(8, 9, 11);
-		updateExpired = Arrays.asList(1, 4);
-
-		getUpdateEmpty = updateEmpty;
-		getUnexpired = updateUnexpired;
-		getUpdateExpired = updateExpired;
-	}
-
-	@Override
-	TtlListState<?, String, Integer> createState() {
-		ListStateDescriptor<Integer> listStateDesc =
-			new ListStateDescriptor<>("TtlTestListState", IntSerializer.INSTANCE);
-		return (TtlListState<?, String, Integer>) wrapMockState(listStateDesc);
-	}
-
-	@Override
-	List<Integer> generateRandomUpdate() {
-		int size = RANDOM.nextInt(5);
-		return IntStream.range(0, size).mapToObj(i -> RANDOM.nextInt(100)).collect(Collectors.toList());
-	}
-
-	@Override
-	Iterable<Integer> getMergeResult(
-		List<Tuple2<String, List<Integer>>> unexpiredUpdatesToMerge,
-		List<Tuple2<String, List<Integer>>> finalUpdatesToMerge) {
-		List<Integer> result = new ArrayList<>();
-		finalUpdatesToMerge.forEach(t -> result.addAll(t.f1));
-		return result;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlListStateTestContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlListStateTestContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlListStateTestContext.java
new file mode 100644
index 0000000..c113bf1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlListStateTestContext.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.StreamSupport;
+
+/** Test suite for {@link TtlListState}. */
+class TtlListStateTestContext
+	extends TtlMergingStateTestContext<TtlListState<?, String, Integer>, List<Integer>, Iterable<Integer>> {
+	@Override
+	void initTestValues() {
+		emptyValue = Collections.emptyList();
+
+		updateEmpty = Arrays.asList(5, 7, 10);
+		updateUnexpired = Arrays.asList(8, 9, 11);
+		updateExpired = Arrays.asList(1, 4);
+
+		getUpdateEmpty = updateEmpty;
+		getUnexpired = updateUnexpired;
+		getUpdateExpired = updateExpired;
+	}
+
+	@Override
+	void update(List<Integer> value) throws Exception {
+		ttlState.addAll(value);
+	}
+
+	@Override
+	Iterable<Integer> get() throws Exception {
+		return StreamSupport.stream(ttlState.get().spliterator(), false).collect(Collectors.toList());
+	}
+
+	@Override
+	Object getOriginal() throws Exception {
+		return ttlState.original.get() == null ? emptyValue : ttlState.original.get();
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	<US extends State, SV> StateDescriptor<US, SV> createStateDescriptor() {
+		return (StateDescriptor<US, SV>) new ListStateDescriptor<>("TtlTestListState", IntSerializer.INSTANCE);
+	}
+
+	@Override
+	List<Integer> generateRandomUpdate() {
+		int size = RANDOM.nextInt(5);
+		return IntStream.range(0, size).mapToObj(i -> RANDOM.nextInt(100)).collect(Collectors.toList());
+	}
+
+	@Override
+	Iterable<Integer> getMergeResult(
+		List<Tuple2<String, List<Integer>>> unexpiredUpdatesToMerge,
+		List<Tuple2<String, List<Integer>>> finalUpdatesToMerge) {
+		List<Integer> result = new ArrayList<>();
+		finalUpdatesToMerge.forEach(t -> result.addAll(t.f1));
+		return result;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateAllEntriesTestContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateAllEntriesTestContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateAllEntriesTestContext.java
new file mode 100644
index 0000000..7fd61aa
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateAllEntriesTestContext.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/** Test suite for collection methods of {@link TtlMapState}. */
+class TtlMapStateAllEntriesTestContext extends
+	TtlMapStateTestContext<Map<Integer, String>, Set<Map.Entry<Integer, String>>> {
+
+	@Override
+	void initTestValues() {
+		emptyValue = Collections.emptySet();
+
+		updateEmpty = mapOf(Tuple2.of(3, "3"), Tuple2.of(5, "5"), Tuple2.of(10, "10"));
+		updateUnexpired = mapOf(Tuple2.of(12, "12"), Tuple2.of(7, "7"));
+		updateExpired = mapOf(Tuple2.of(15, "15"), Tuple2.of(4, "4"));
+
+		getUpdateEmpty = updateEmpty.entrySet();
+		getUnexpired = updateUnexpired.entrySet();
+		getUpdateExpired = updateExpired.entrySet();
+	}
+
+	@SafeVarargs
+	private static <UK, UV> Map<UK, UV> mapOf(Tuple2<UK, UV> ... entries) {
+		return Arrays.stream(entries).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+	}
+
+	@Override
+	void update(Map<Integer, String> map) throws Exception {
+		ttlState.putAll(map);
+	}
+
+	@Override
+	Set<Map.Entry<Integer, String>> get() throws Exception {
+		return StreamSupport.stream(ttlState.entries().spliterator(), false).collect(Collectors.toSet());
+	}
+
+	@Override
+	Object getOriginal() throws Exception {
+		return ttlState.original.entries() == null ? Collections.emptySet() : ttlState.original.entries();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTest.java
deleted file mode 100644
index d6949e7..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTest.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.ttl;
-
-/** Test suite for per element methods of {@link TtlMapState}. */
-public class TtlMapStatePerElementTest extends TtlMapStateTestBase<String, String> {
-	private static final int TEST_KEY = 1;
-	private static final String TEST_VAL1 = "test value1";
-	private static final String TEST_VAL2 = "test value2";
-	private static final String TEST_VAL3 = "test value3";
-
-	@Override
-	void initTestValues() {
-		updater = v -> ttlState.put(TEST_KEY, v);
-		getter = () -> ttlState.get(TEST_KEY);
-		originalGetter = () -> ttlState.original.get(TEST_KEY);
-
-		updateEmpty = TEST_VAL1;
-		updateUnexpired = TEST_VAL2;
-		updateExpired = TEST_VAL3;
-
-		getUpdateEmpty = TEST_VAL1;
-		getUnexpired = TEST_VAL2;
-		getUpdateExpired = TEST_VAL3;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTestContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTestContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTestContext.java
new file mode 100644
index 0000000..fb025af
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTestContext.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+/** Test suite for per element methods of {@link TtlMapState}. */
+class TtlMapStatePerElementTestContext extends TtlMapStateTestContext<String, String> {
+	private static final int TEST_KEY = 1;
+	private static final String TEST_VAL1 = "test value1";
+	private static final String TEST_VAL2 = "test value2";
+	private static final String TEST_VAL3 = "test value3";
+
+	@Override
+	void initTestValues() {
+		updateEmpty = TEST_VAL1;
+		updateUnexpired = TEST_VAL2;
+		updateExpired = TEST_VAL3;
+
+		getUpdateEmpty = TEST_VAL1;
+		getUnexpired = TEST_VAL2;
+		getUpdateExpired = TEST_VAL3;
+	}
+
+	@Override
+	void update(String value) throws Exception {
+		ttlState.put(TEST_KEY, value);
+	}
+
+	@Override
+	String get() throws Exception {
+		return ttlState.get(TEST_KEY);
+	}
+
+	@Override
+	Object getOriginal() throws Exception {
+		return ttlState.original.get(TEST_KEY);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTest.java
deleted file mode 100644
index bac2f41..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.ttl;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
-
-/** Test suite for collection methods of {@link TtlMapState}. */
-public class TtlMapStateTest extends
-	TtlMapStateTestBase<Map<Integer, String>, Set<Map.Entry<Integer, String>>> {
-
-	@Override
-	void initTestValues() {
-		updater = map -> ttlState.putAll(map);
-		getter = () -> StreamSupport.stream(ttlState.entries().spliterator(), false).collect(Collectors.toSet());
-		originalGetter = () -> ttlState.original.entries();
-
-		emptyValue = Collections.emptySet();
-
-		updateEmpty = mapOf(Tuple2.of(3, "3"), Tuple2.of(5, "5"), Tuple2.of(10, "10"));
-		updateUnexpired = mapOf(Tuple2.of(12, "12"), Tuple2.of(7, "7"));
-		updateExpired = mapOf(Tuple2.of(15, "15"), Tuple2.of(4, "4"));
-
-		getUpdateEmpty = updateEmpty.entrySet();
-		getUnexpired = updateUnexpired.entrySet();
-		getUpdateExpired = updateExpired.entrySet();
-	}
-
-	@SafeVarargs
-	private static <UK, UV> Map<UK, UV> mapOf(Tuple2<UK, UV> ... entries) {
-		return Arrays.stream(entries).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTestBase.java
deleted file mode 100644
index dab3194..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTestBase.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.ttl;
-
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-
-abstract class TtlMapStateTestBase<UV, GV>
-	extends TtlStateTestBase<TtlMapState<?, String, Integer, String>, UV, GV> {
-	@Override
-	TtlMapState<?, String, Integer, String> createState() {
-		MapStateDescriptor<Integer, String> mapStateDesc =
-			new MapStateDescriptor<>("TtlTestMapState", IntSerializer.INSTANCE, StringSerializer.INSTANCE);
-		return (TtlMapState<?, String, Integer, String>) wrapMockState(mapStateDesc);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTestContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTestContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTestContext.java
new file mode 100644
index 0000000..bb7b327
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTestContext.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+
+abstract class TtlMapStateTestContext<UV, GV>
+	extends TtlStateTestContextBase<TtlMapState<?, String, Integer, String>, UV, GV> {
+	@SuppressWarnings("unchecked")
+	@Override
+	<US extends State, SV> StateDescriptor<US, SV> createStateDescriptor() {
+		return (StateDescriptor<US, SV>) new MapStateDescriptor<>(
+			"TtlTestMapState", IntSerializer.INSTANCE, StringSerializer.INSTANCE);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMergingStateBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMergingStateBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMergingStateBase.java
deleted file mode 100644
index 6a7aebe..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMergingStateBase.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.ttl;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.state.internal.InternalMergingState;
-
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Random;
-
-import static org.junit.Assert.assertEquals;
-
-abstract class TtlMergingStateBase<S extends InternalMergingState<?, String, ?, ?, GV>, UV, GV>
-	extends TtlStateTestBase<S, UV, GV> {
-	static final Random RANDOM = new Random();
-
-	private static final List<String> NAMESPACES = Arrays.asList(
-		"unsetNamespace1",
-		"unsetNamespace2",
-		"expiredNamespace",
-		"expiredAndUpdatedNamespace",
-		"unexpiredNamespace",
-		"finalNamespace");
-
-	@Test
-	public void testMergeNamespaces() throws Exception {
-		initTest();
-
-		timeProvider.time = 0;
-		List<Tuple2<String, UV>> expiredUpdatesToMerge = generateExpiredUpdatesToMerge();
-		applyStateUpdates(expiredUpdatesToMerge);
-
-		timeProvider.time = 120;
-		List<Tuple2<String, UV>> unexpiredUpdatesToMerge = generateUnexpiredUpdatesToMerge();
-		applyStateUpdates(unexpiredUpdatesToMerge);
-
-		timeProvider.time = 150;
-		List<Tuple2<String, UV>> finalUpdatesToMerge = generateFinalUpdatesToMerge();
-		applyStateUpdates(finalUpdatesToMerge);
-
-		timeProvider.time = 230;
-		ttlState.mergeNamespaces("targetNamespace", NAMESPACES);
-		ttlState.setCurrentNamespace("targetNamespace");
-		assertEquals("Unexpected result of merge operation",
-			getMergeResult(unexpiredUpdatesToMerge, finalUpdatesToMerge), getter.get());
-	}
-
-	private List<Tuple2<String, UV>> generateExpiredUpdatesToMerge() {
-		return Arrays.asList(
-			Tuple2.of("expiredNamespace", generateRandomUpdate()),
-			Tuple2.of("expiredNamespace", generateRandomUpdate()),
-			Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate()),
-			Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate())
-		);
-	}
-
-	private List<Tuple2<String, UV>> generateUnexpiredUpdatesToMerge() {
-		return Arrays.asList(
-			Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate()),
-			Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate()),
-			Tuple2.of("unexpiredNamespace", generateRandomUpdate()),
-			Tuple2.of("unexpiredNamespace", generateRandomUpdate())
-		);
-	}
-
-	private List<Tuple2<String, UV>> generateFinalUpdatesToMerge() {
-		return Arrays.asList(
-			Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate()),
-			Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate()),
-			Tuple2.of("unexpiredNamespace", generateRandomUpdate()),
-			Tuple2.of("unexpiredNamespace", generateRandomUpdate()),
-			Tuple2.of("finalNamespace", generateRandomUpdate()),
-			Tuple2.of("finalNamespace", generateRandomUpdate())
-		);
-	}
-
-	abstract UV generateRandomUpdate();
-
-	private void applyStateUpdates(List<Tuple2<String, UV>> updates) throws Exception {
-		for (Tuple2<String, UV> t : updates) {
-			ttlState.setCurrentNamespace(t.f0);
-			updater.accept(t.f1);
-		}
-	}
-
-	abstract GV getMergeResult(
-		List<Tuple2<String, UV>> unexpiredUpdatesToMerge,
-		List<Tuple2<String, UV>> finalUpdatesToMerge);
-
-	@SuppressWarnings("unchecked")
-	abstract static class TtlIntegerMergingStateBase<
-		S extends InternalMergingState<?, String, ?, ?, GV>,
-		UV extends Number, GV>
-		extends TtlMergingStateBase<S, UV, GV> {
-		@Override
-		UV generateRandomUpdate() {
-			return (UV) (Integer) RANDOM.nextInt(1000);
-		}
-
-		int getIntegerMergeResult(
-			List<Tuple2<String, UV>> unexpiredUpdatesToMerge,
-			List<Tuple2<String, UV>> finalUpdatesToMerge) {
-			return unexpiredUpdatesToMerge.stream().mapToInt(t -> (Integer) t.f1).sum() +
-				finalUpdatesToMerge.stream().mapToInt(t -> (Integer) t.f1).sum();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMergingStateTestContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMergingStateTestContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMergingStateTestContext.java
new file mode 100644
index 0000000..85c6134
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMergingStateTestContext.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.internal.InternalMergingState;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+abstract class TtlMergingStateTestContext<S extends InternalMergingState<?, String, ?, ?, GV>, UV, GV>
+	extends TtlStateTestContextBase<S, UV, GV> {
+	static final Random RANDOM = new Random();
+
+	static final List<String> NAMESPACES = Arrays.asList(
+		"unsetNamespace1",
+		"unsetNamespace2",
+		"expiredNamespace",
+		"expiredAndUpdatedNamespace",
+		"unexpiredNamespace",
+		"finalNamespace");
+
+	List<Tuple2<String, UV>> generateExpiredUpdatesToMerge() {
+		return Arrays.asList(
+			Tuple2.of("expiredNamespace", generateRandomUpdate()),
+			Tuple2.of("expiredNamespace", generateRandomUpdate()),
+			Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate()),
+			Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate())
+		);
+	}
+
+	List<Tuple2<String, UV>> generateUnexpiredUpdatesToMerge() {
+		return Arrays.asList(
+			Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate()),
+			Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate()),
+			Tuple2.of("unexpiredNamespace", generateRandomUpdate()),
+			Tuple2.of("unexpiredNamespace", generateRandomUpdate())
+		);
+	}
+
+	List<Tuple2<String, UV>> generateFinalUpdatesToMerge() {
+		return Arrays.asList(
+			Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate()),
+			Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate()),
+			Tuple2.of("unexpiredNamespace", generateRandomUpdate()),
+			Tuple2.of("unexpiredNamespace", generateRandomUpdate()),
+			Tuple2.of("finalNamespace", generateRandomUpdate()),
+			Tuple2.of("finalNamespace", generateRandomUpdate())
+		);
+	}
+
+	abstract UV generateRandomUpdate();
+
+	void applyStateUpdates(List<Tuple2<String, UV>> updates) throws Exception {
+		for (Tuple2<String, UV> t : updates) {
+			ttlState.setCurrentNamespace(t.f0);
+			update(t.f1);
+		}
+	}
+
+	abstract GV getMergeResult(
+		List<Tuple2<String, UV>> unexpiredUpdatesToMerge,
+		List<Tuple2<String, UV>> finalUpdatesToMerge);
+
+	@SuppressWarnings("unchecked")
+	abstract static class TtlIntegerMergingStateTestContext<
+		S extends InternalMergingState<?, String, ?, ?, GV>,
+		UV extends Number, GV>
+		extends TtlMergingStateTestContext<S, UV, GV> {
+		@Override
+		UV generateRandomUpdate() {
+			return (UV) (Integer) RANDOM.nextInt(1000);
+		}
+
+		int getIntegerMergeResult(
+			List<Tuple2<String, UV>> unexpiredUpdatesToMerge,
+			List<Tuple2<String, UV>> finalUpdatesToMerge) {
+			return unexpiredUpdatesToMerge.stream().mapToInt(t -> (Integer) t.f1).sum() +
+				finalUpdatesToMerge.stream().mapToInt(t -> (Integer) t.f1).sum();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlReducingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlReducingStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlReducingStateTest.java
deleted file mode 100644
index bc5f67f..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlReducingStateTest.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.ttl;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.state.ReducingStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-
-import java.util.List;
-
-/** Test suite for {@link TtlReducingState}. */
-public class TtlReducingStateTest
-	extends TtlMergingStateBase.TtlIntegerMergingStateBase<TtlReducingState<?, String, Integer>, Integer, Integer> {
-	@Override
-	void initTestValues() {
-		updater = v -> ttlState.add(v);
-		getter = () -> ttlState.get();
-		originalGetter = () -> ttlState.original.get();
-
-		updateEmpty = 5;
-		updateUnexpired = 7;
-		updateExpired = 6;
-
-		getUpdateEmpty = 5;
-		getUnexpired = 12;
-		getUpdateExpired = 6;
-	}
-
-	@Override
-	TtlReducingState<?, String, Integer> createState() {
-		ReducingStateDescriptor<Integer> aggregatingStateDes =
-			new ReducingStateDescriptor<>("TtlTestReducingState", REDUCE, IntSerializer.INSTANCE);
-		return (TtlReducingState<?, String, Integer>) wrapMockState(aggregatingStateDes);
-	}
-
-	@Override
-	Integer getMergeResult(
-		List<Tuple2<String, Integer>> unexpiredUpdatesToMerge,
-		List<Tuple2<String, Integer>> finalUpdatesToMerge) {
-		return getIntegerMergeResult(unexpiredUpdatesToMerge, finalUpdatesToMerge);
-	}
-
-	private static final ReduceFunction<Integer> REDUCE = (v1, v2) -> {
-		if (v1 == null && v2 == null) {
-			return null;
-		} else if (v1 == null) {
-			return v2;
-		} else if (v2 == null) {
-			return v1;
-		} else {
-			return v1 + v2;
-		}
-	};
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlReducingStateTestContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlReducingStateTestContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlReducingStateTestContext.java
new file mode 100644
index 0000000..ea5e61b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlReducingStateTestContext.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import java.util.List;
+
+/** Test suite for {@link TtlReducingState}. */
+class TtlReducingStateTestContext
+	extends TtlMergingStateTestContext.TtlIntegerMergingStateTestContext<TtlReducingState<?, String, Integer>, Integer, Integer> {
+	@Override
+	void initTestValues() {
+		updateEmpty = 5;
+		updateUnexpired = 7;
+		updateExpired = 6;
+
+		getUpdateEmpty = 5;
+		getUnexpired = 12;
+		getUpdateExpired = 6;
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	<US extends State, SV> StateDescriptor<US, SV> createStateDescriptor() {
+		return (StateDescriptor<US, SV>) new ReducingStateDescriptor<>(
+			"TtlTestReducingState", REDUCE, IntSerializer.INSTANCE);
+	}
+
+	@Override
+	void update(Integer value) throws Exception {
+		ttlState.add(value);
+	}
+
+	@Override
+	Integer get() throws Exception {
+		return ttlState.get();
+	}
+
+	@Override
+	Object getOriginal() throws Exception {
+		return ttlState.original.get();
+	}
+
+	@Override
+	Integer getMergeResult(
+		List<Tuple2<String, Integer>> unexpiredUpdatesToMerge,
+		List<Tuple2<String, Integer>> finalUpdatesToMerge) {
+		return getIntegerMergeResult(unexpiredUpdatesToMerge, finalUpdatesToMerge);
+	}
+
+	private static final ReduceFunction<Integer> REDUCE = (v1, v2) -> {
+		if (v1 == null && v2 == null) {
+			return null;
+		} else if (v1 == null) {
+			return v2;
+		} else if (v2 == null) {
+			return v1;
+		} else {
+			return v1 + v2;
+		}
+	};
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
index bc3d6e7..5820f13 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
@@ -22,105 +22,153 @@ import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.StateTtlConfiguration;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.runtime.state.KeyedStateFactory;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.internal.InternalKvState;
-import org.apache.flink.runtime.state.ttl.mock.MockKeyedStateFactory;
-import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.flink.util.function.SupplierWithException;
-import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.StateMigrationException;
 
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.Consumer;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.not;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assume.assumeThat;
 
-abstract class TtlStateTestBase<S extends InternalKvState<?, String, ?>, UV, GV> {
+/** State TTL base test suite. */
+@RunWith(Parameterized.class)
+public abstract class TtlStateTestBase {
 	private static final long TTL = 100;
-	private static final KeyedStateFactory MOCK_ORIGINAL_STATE_FACTORY = new MockKeyedStateFactory();
 
-	S ttlState;
-	MockTimeProvider timeProvider;
-	StateTtlConfiguration ttlConfig;
+	private MockTtlTimeProvider timeProvider;
+	private StateBackendTestContext sbetc;
+	private StateTtlConfiguration ttlConfig;
 
-	ThrowingConsumer<UV, Exception> updater;
-	SupplierWithException<GV, Exception> getter;
-	SupplierWithException<?, Exception> originalGetter;
+	@Before
+	public void setup() {
+		timeProvider = new MockTtlTimeProvider();
+		sbetc = createStateBackendTestContext(timeProvider);
+	}
 
-	UV updateEmpty;
-	UV updateUnexpired;
-	UV updateExpired;
+	protected abstract StateBackendTestContext createStateBackendTestContext(TtlTimeProvider timeProvider);
+
+	@Parameterized.Parameter
+	public TtlStateTestContextBase<?, ?, ?> ctx;
+
+	@Parameterized.Parameters(name = "{0}")
+	public static List<TtlStateTestContextBase<?, ?, ?>> testContexts() {
+		return Arrays.asList(
+			new TtlValueStateTestContext(),
+			new TtlListStateTestContext(),
+			new TtlMapStateAllEntriesTestContext(),
+			new TtlMapStatePerElementTestContext(),
+			new TtlAggregatingStateTestContext(),
+			new TtlReducingStateTestContext(),
+			new TtlFoldingStateTestContext());
+	}
 
-	GV getUpdateEmpty;
-	GV getUnexpired;
-	GV getUpdateExpired;
+	@SuppressWarnings("unchecked")
+	private <S extends InternalKvState<?, String, ?>, UV> TtlStateTestContextBase<S, UV, ?> ctx() {
+		return (TtlStateTestContextBase<S, UV, ?>) ctx;
+	}
 
-	GV emptyValue = null;
+	@SuppressWarnings("unchecked")
+	private <UV> TtlMergingStateTestContext<?, UV, ?> mctx() {
+		return (TtlMergingStateTestContext<?, UV, ?>) ctx;
+	}
 
-	void initTest() {
+	private void initTest() throws Exception {
 		initTest(StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite, StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired);
 	}
 
-	private void initTest(StateTtlConfiguration.TtlUpdateType updateType, StateTtlConfiguration.TtlStateVisibility visibility) {
+	private void initTest(
+				StateTtlConfiguration.TtlUpdateType updateType,
+				StateTtlConfiguration.TtlStateVisibility visibility) throws Exception {
 		initTest(updateType, visibility, TTL);
 	}
 
-	private void initTest(StateTtlConfiguration.TtlUpdateType updateType, StateTtlConfiguration.TtlStateVisibility visibility, long ttl) {
-		timeProvider = new MockTimeProvider();
-		StateTtlConfiguration.Builder ttlConfigBuilder = StateTtlConfiguration.newBuilder(Time.seconds(5));
-		ttlConfigBuilder.setTtlUpdateType(updateType)
-						.setStateVisibility(visibility)
-						.setTimeCharacteristic(StateTtlConfiguration.TtlTimeCharacteristic.ProcessingTime)
-						.setTtl(Time.milliseconds(ttl));
-		ttlConfig = ttlConfigBuilder.build();
-		ttlState = createState();
-		initTestValues();
+	private void initTest(
+		StateTtlConfiguration.TtlUpdateType updateType,
+		StateTtlConfiguration.TtlStateVisibility visibility,
+		long ttl) throws Exception {
+		ttlConfig = StateTtlConfiguration
+			.newBuilder(Time.milliseconds(ttl))
+			.setTtlUpdateType(updateType)
+			.setStateVisibility(visibility)
+			.build();
+		sbetc.createAndRestoreKeyedStateBackend();
+		sbetc.restoreSnapshot(null);
+		createState();
+		ctx().initTestValues();
 	}
 
-	abstract S createState();
-
-	<SV, US extends State, IS extends US> IS wrapMockState(StateDescriptor<IS, SV> stateDesc) {
-		try {
-			return TtlStateFactory.createStateAndWrapWithTtlIfEnabled(
-				StringSerializer.INSTANCE, stateDesc,
-				MOCK_ORIGINAL_STATE_FACTORY, ttlConfig, timeProvider);
-		} catch (Exception e) {
-			throw new FlinkRuntimeException("Unexpected exception wrapping mock state", e);
-		}
+	@SuppressWarnings("unchecked")
+	private <S extends State> void createState() throws Exception {
+		StateDescriptor<S, Object> stateDescriptor = ctx().createStateDescriptor();
+		stateDescriptor.enableTimeToLive(ttlConfig);
+		ctx().ttlState =
+			(InternalKvState<?, String, ?>) sbetc.createState(stateDescriptor, "defaultNamespace");
 	}
 
-	abstract void initTestValues();
+	private void takeAndRestoreSnapshot() throws Exception {
+		KeyedStateHandle snapshot = sbetc.takeSnapshot();
+		sbetc.createAndRestoreKeyedStateBackend();
+		sbetc.restoreSnapshot(snapshot);
+		createState();
+	}
 
 	@Test
 	public void testNonExistentValue() throws Exception {
 		initTest();
-		assertEquals("Non-existing state should be empty", emptyValue, getter.get());
+		assertEquals("Non-existing state should be empty", ctx().emptyValue, ctx().get());
 	}
 
 	@Test
 	public void testExactExpirationOnWrite() throws Exception {
 		initTest(StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite, StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired);
 
+		takeAndRestoreSnapshot();
+
 		timeProvider.time = 0;
-		updater.accept(updateEmpty);
+		ctx().update(ctx().updateEmpty);
+
+		takeAndRestoreSnapshot();
 
 		timeProvider.time = 20;
-		assertEquals("Unexpired state should be available", getUpdateEmpty, getter.get());
+		assertEquals("Unexpired state should be available", ctx().getUpdateEmpty, ctx().get());
+
+		takeAndRestoreSnapshot();
 
 		timeProvider.time = 50;
-		updater.accept(updateUnexpired);
+		ctx().update(ctx().updateUnexpired);
+
+		takeAndRestoreSnapshot();
 
 		timeProvider.time = 120;
-		assertEquals("Unexpired state should be available after update", getUnexpired, getter.get());
+		assertEquals("Unexpired state should be available after update", ctx().getUnexpired, ctx().get());
+
+		takeAndRestoreSnapshot();
 
 		timeProvider.time = 170;
-		updater.accept(updateExpired);
+		ctx().update(ctx().updateExpired);
+
+		takeAndRestoreSnapshot();
 
 		timeProvider.time = 220;
-		assertEquals("Unexpired state should be available after update", getUpdateExpired, getter.get());
+		assertEquals("Unexpired state should be available after update", ctx().getUpdateExpired, ctx().get());
+
+		takeAndRestoreSnapshot();
 
 		timeProvider.time = 300;
-		assertEquals("Expired state should be unavailable", emptyValue, getter.get());
-		assertEquals("Original state should be cleared on access", emptyValue, originalGetter.get());
+		assertEquals("Expired state should be unavailable", ctx().emptyValue, ctx().get());
+		assertEquals("Original state should be cleared on access", ctx().emptyValue, ctx().getOriginal());
 	}
 
 	@Test
@@ -128,11 +176,14 @@ abstract class TtlStateTestBase<S extends InternalKvState<?, String, ?>, UV, GV>
 		initTest(StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite, StateTtlConfiguration.TtlStateVisibility.ReturnExpiredIfNotCleanedUp);
 
 		timeProvider.time = 0;
-		updater.accept(updateEmpty);
+		ctx().update(ctx().updateEmpty);
+
+		takeAndRestoreSnapshot();
 
 		timeProvider.time = 120;
-		assertEquals("Expired state should be available", getUpdateEmpty, getter.get());
-		assertEquals("Expired state should be cleared on access", emptyValue, getter.get());
+		assertEquals("Expired state should be available", ctx().getUpdateEmpty, ctx().get());
+		assertEquals("Original state should be cleared on access", ctx().emptyValue, ctx().getOriginal());
+		assertEquals("Expired state should be cleared on access", ctx().emptyValue, ctx().get());
 	}
 
 	@Test
@@ -140,17 +191,23 @@ abstract class TtlStateTestBase<S extends InternalKvState<?, String, ?>, UV, GV>
 		initTest(StateTtlConfiguration.TtlUpdateType.OnReadAndWrite, StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired);
 
 		timeProvider.time = 0;
-		updater.accept(updateEmpty);
+		ctx().update(ctx().updateEmpty);
+
+		takeAndRestoreSnapshot();
 
 		timeProvider.time = 50;
-		assertEquals("Unexpired state should be available", getUpdateEmpty, getter.get());
+		assertEquals("Unexpired state should be available", ctx().getUpdateEmpty, ctx().get());
+
+		takeAndRestoreSnapshot();
 
 		timeProvider.time = 120;
-		assertEquals("Unexpired state should be available after read", getUpdateEmpty, getter.get());
+		assertEquals("Unexpired state should be available after read", ctx().getUpdateEmpty, ctx().get());
+
+		takeAndRestoreSnapshot();
 
 		timeProvider.time = 250;
-		assertEquals("Expired state should be unavailable", emptyValue, getter.get());
-		assertEquals("Original state should be cleared on access", emptyValue, originalGetter.get());
+		assertEquals("Expired state should be unavailable", ctx().emptyValue, ctx().get());
+		assertEquals("Original state should be cleared on access", ctx().emptyValue, ctx().getOriginal());
 	}
 
 	@Test
@@ -158,14 +215,18 @@ abstract class TtlStateTestBase<S extends InternalKvState<?, String, ?>, UV, GV>
 		initTest(StateTtlConfiguration.TtlUpdateType.OnReadAndWrite, StateTtlConfiguration.TtlStateVisibility.ReturnExpiredIfNotCleanedUp);
 
 		timeProvider.time = 0;
-		updater.accept(updateEmpty);
+		ctx().update(ctx().updateEmpty);
+
+		takeAndRestoreSnapshot();
 
 		timeProvider.time = 50;
-		assertEquals("Unexpired state should be available", getUpdateEmpty, getter.get());
+		assertEquals("Unexpired state should be available", ctx().getUpdateEmpty, ctx().get());
+
+		takeAndRestoreSnapshot();
 
 		timeProvider.time = 170;
-		assertEquals("Expired state should be available", getUpdateEmpty, getter.get());
-		assertEquals("Expired state should be cleared on access", emptyValue, getter.get());
+		assertEquals("Expired state should be available", ctx().getUpdateEmpty, ctx().get());
+		assertEquals("Expired state should be cleared on access", ctx().emptyValue, ctx().get());
 	}
 
 	@Test
@@ -173,9 +234,154 @@ abstract class TtlStateTestBase<S extends InternalKvState<?, String, ?>, UV, GV>
 		initTest(StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite, StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired, Long.MAX_VALUE);
 
 		timeProvider.time = 10;
-		updater.accept(updateEmpty);
+		ctx().update(ctx().updateEmpty);
+
+		takeAndRestoreSnapshot();
+
+		timeProvider.time = 50;
+		assertEquals("Unexpired state should be available", ctx().getUpdateEmpty, ctx().get());
+	}
+
+	@Test
+	public void testMergeNamespaces() throws Exception {
+		assumeThat(ctx, instanceOf(TtlMergingStateTestContext.class));
+
+		initTest();
+
+		timeProvider.time = 0;
+		List<Tuple2<String, Object>> expiredUpdatesToMerge = mctx().generateExpiredUpdatesToMerge();
+		mctx().applyStateUpdates(expiredUpdatesToMerge);
+
+		takeAndRestoreSnapshot();
+
+		timeProvider.time = 120;
+		List<Tuple2<String, Object>> unexpiredUpdatesToMerge = mctx().generateUnexpiredUpdatesToMerge();
+		mctx().applyStateUpdates(unexpiredUpdatesToMerge);
+
+		takeAndRestoreSnapshot();
+
+		timeProvider.time = 150;
+		List<Tuple2<String, Object>> finalUpdatesToMerge = mctx().generateFinalUpdatesToMerge();
+		mctx().applyStateUpdates(finalUpdatesToMerge);
+
+		takeAndRestoreSnapshot();
+
+		timeProvider.time = 230;
+		mctx().ttlState.mergeNamespaces("targetNamespace", TtlMergingStateTestContext.NAMESPACES);
+		mctx().ttlState.setCurrentNamespace("targetNamespace");
+		assertEquals("Unexpected result of merge operation",
+			mctx().getMergeResult(unexpiredUpdatesToMerge, finalUpdatesToMerge), mctx().get());
+	}
+
+	@Test
+	public void testMultipleKeys() throws Exception {
+		testMultipleStateIds(id -> sbetc.setCurrentKey(id));
+	}
+
+	@Test
+	public void testMultipleNamespaces() throws Exception {
+		testMultipleStateIds(id -> ctx().ttlState.setCurrentNamespace(id));
+	}
+
+	private void testMultipleStateIds(Consumer<String> idChanger) throws Exception {
+		initTest();
+
+		timeProvider.time = 0;
+		idChanger.accept("id2");
+		ctx().update(ctx().updateEmpty);
+
+		takeAndRestoreSnapshot();
 
 		timeProvider.time = 50;
-		assertEquals("Unexpired state should be available", getUpdateEmpty, getter.get());
+		idChanger.accept("id1");
+		ctx().update(ctx().updateEmpty);
+		idChanger.accept("id2");
+		ctx().update(ctx().updateUnexpired);
+
+		takeAndRestoreSnapshot();
+
+		timeProvider.time = 120;
+		idChanger.accept("id1");
+		assertEquals("Unexpired state should be available", ctx().getUpdateEmpty, ctx().get());
+		idChanger.accept("id2");
+		assertEquals("Unexpired state should be available after update", ctx().getUnexpired, ctx().get());
+
+		takeAndRestoreSnapshot();
+
+		timeProvider.time = 170;
+		idChanger.accept("id2");
+		ctx().update(ctx().updateExpired);
+
+		takeAndRestoreSnapshot();
+
+		timeProvider.time = 230;
+		idChanger.accept("id1");
+		assertEquals("Expired state should be unavailable", ctx().emptyValue, ctx().get());
+		idChanger.accept("id2");
+		assertEquals("Unexpired state should be available after update", ctx().getUpdateExpired, ctx().get());
+
+		takeAndRestoreSnapshot();
+
+		timeProvider.time = 300;
+		idChanger.accept("id1");
+		assertEquals("Expired state should be unavailable", ctx().emptyValue, ctx().get());
+		idChanger.accept("id2");
+		assertEquals("Expired state should be unavailable", ctx().emptyValue, ctx().get());
+	}
+
+	@Test
+	public void testSnapshotChangeRestore() throws Exception {
+		initTest();
+
+		timeProvider.time = 0;
+		sbetc.setCurrentKey("k1");
+		ctx().update(ctx().updateEmpty);
+
+		timeProvider.time = 50;
+		sbetc.setCurrentKey("k1");
+		ctx().update(ctx().updateUnexpired);
+
+		timeProvider.time = 100;
+		sbetc.setCurrentKey("k2");
+		ctx().update(ctx().updateEmpty);
+
+		KeyedStateHandle snapshot = sbetc.takeSnapshot();
+
+		timeProvider.time = 170;
+		sbetc.setCurrentKey("k1");
+		ctx().update(ctx().updateExpired);
+		sbetc.setCurrentKey("k2");
+		ctx().update(ctx().updateUnexpired);
+
+		sbetc.createAndRestoreKeyedStateBackend();
+		sbetc.restoreSnapshot(snapshot);
+		createState();
+
+		timeProvider.time = 180;
+		sbetc.setCurrentKey("k1");
+		assertEquals("Expired state should be unavailable", ctx().emptyValue, ctx().get());
+		sbetc.setCurrentKey("k2");
+		assertEquals("Unexpired state should be available", ctx().getUpdateEmpty, ctx().get());
+	}
+
+	@Test(expected = StateMigrationException.class)
+	public void testRestoreTtlAndRegisterNonTtlStateCompatFailure() throws Exception {
+		assumeThat(this, not(instanceOf(MockTtlStateTest.class)));
+
+		initTest();
+
+		timeProvider.time = 0;
+		ctx().update(ctx().updateEmpty);
+
+		KeyedStateHandle snapshot = sbetc.takeSnapshot();
+		sbetc.createAndRestoreKeyedStateBackend();
+
+		sbetc.restoreSnapshot(snapshot);
+		sbetc.createState(ctx().createStateDescriptor(), "");
+	}
+
+	@After
+	public void tearDown() {
+		sbetc.disposeKeyedStateBackend();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestContextBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestContextBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestContextBase.java
new file mode 100644
index 0000000..d40bfb0
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestContextBase.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+
+abstract class TtlStateTestContextBase<S extends InternalKvState<?, String, ?>, UV, GV> {
+	S ttlState;
+
+	UV updateEmpty;
+	UV updateUnexpired;
+	UV updateExpired;
+
+	GV getUpdateEmpty;
+	GV getUnexpired;
+	GV getUpdateExpired;
+
+	GV emptyValue = null;
+
+	abstract void initTestValues();
+
+	abstract <US extends State, SV> StateDescriptor<US, SV> createStateDescriptor();
+
+	abstract void update(UV value) throws Exception;
+
+	abstract GV get() throws Exception;
+
+	abstract Object getOriginal() throws Exception;
+
+	@Override
+	public String toString() {
+		return this.getClass().getSimpleName();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlValueStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlValueStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlValueStateTest.java
deleted file mode 100644
index 8d9a4b4..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlValueStateTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.ttl;
-
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-
-/** Test suite for {@link TtlValueState}. */
-public class TtlValueStateTest extends TtlStateTestBase<TtlValueState<?, String, String>, String, String> {
-	private static final String TEST_VAL1 = "test value1";
-	private static final String TEST_VAL2 = "test value2";
-	private static final String TEST_VAL3 = "test value3";
-
-	@Override
-	void initTestValues() {
-		updater = v -> ttlState.update(v);
-		getter = () -> ttlState.value();
-		originalGetter = () -> ttlState.original.value();
-
-		updateEmpty = TEST_VAL1;
-		updateUnexpired = TEST_VAL2;
-		updateExpired = TEST_VAL3;
-
-		getUpdateEmpty = TEST_VAL1;
-		getUnexpired = TEST_VAL2;
-		getUpdateExpired = TEST_VAL3;
-	}
-
-	@Override
-	TtlValueState<?, String, String> createState() {
-		ValueStateDescriptor<String> valueStateDesc =
-			new ValueStateDescriptor<>("TtlValueTestState", StringSerializer.INSTANCE);
-		return (TtlValueState<?, String, String>) wrapMockState(valueStateDesc);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlValueStateTestContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlValueStateTestContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlValueStateTestContext.java
new file mode 100644
index 0000000..976d891
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlValueStateTestContext.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+
+/** Test suite for {@link TtlValueState}. */
+class TtlValueStateTestContext extends TtlStateTestContextBase<TtlValueState<?, String, String>, String, String> {
+	private static final String TEST_VAL1 = "test value1";
+	private static final String TEST_VAL2 = "test value2";
+	private static final String TEST_VAL3 = "test value3";
+
+	@Override
+	void initTestValues() {
+		updateEmpty = TEST_VAL1;
+		updateUnexpired = TEST_VAL2;
+		updateExpired = TEST_VAL3;
+
+		getUpdateEmpty = TEST_VAL1;
+		getUnexpired = TEST_VAL2;
+		getUpdateExpired = TEST_VAL3;
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	<US extends State, SV> StateDescriptor<US, SV> createStateDescriptor() {
+		return (StateDescriptor<US, SV>) new ValueStateDescriptor<>(
+			"TtlValueTestState", StringSerializer.INSTANCE);
+	}
+
+	@Override
+	void update(String value) throws Exception {
+		ttlState.update(value);
+	}
+
+	@Override
+	String get() throws Exception {
+		return ttlState.value();
+	}
+
+	@Override
+	Object getOriginal() throws Exception {
+		return ttlState.original.value();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalKvState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalKvState.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalKvState.java
index 439ca7f2..3f94669 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalKvState.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalKvState.java
@@ -21,14 +21,12 @@ package org.apache.flink.runtime.state.ttl.mock;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 
-import java.util.HashMap;
 import java.util.Map;
 import java.util.function.Supplier;
 
 /** In memory mock internal state base class. */
-class MockInternalKvState<K, N, T> implements InternalKvState<K, N, T> {
-	private Map<N, T> namespacedValues = new HashMap<>();
-	private T defaultNamespaceValue;
+abstract class MockInternalKvState<K, N, T> implements InternalKvState<K, N, T> {
+	Supplier<Map<Object, Object>> values;
 	private N currentNamespace;
 	private final Supplier<T> emptyValue;
 
@@ -38,7 +36,6 @@ class MockInternalKvState<K, N, T> implements InternalKvState<K, N, T> {
 
 	MockInternalKvState(Supplier<T> emptyValue) {
 		this.emptyValue = emptyValue;
-		defaultNamespaceValue = emptyValue.get();
 	}
 
 	@Override
@@ -72,26 +69,20 @@ class MockInternalKvState<K, N, T> implements InternalKvState<K, N, T> {
 
 	@Override
 	public void clear() {
-		if (currentNamespace == null) {
-			defaultNamespaceValue = emptyValue.get();
-		} else {
-			namespacedValues.remove(currentNamespace);
-		}
+		getCurrentKeyValues().remove(currentNamespace);
 	}
 
+	@SuppressWarnings("unchecked")
 	public T getInternal() {
-		T value = currentNamespace == null ? defaultNamespaceValue :
-			namespacedValues.getOrDefault(currentNamespace, emptyValue.get());
-		updateInternal(value);
-		return value;
+		return (T) getCurrentKeyValues().computeIfAbsent(currentNamespace, n -> emptyValue.get());
 	}
 
 	@SuppressWarnings("WeakerAccess")
 	public void updateInternal(T valueToStore) {
-		if (currentNamespace == null) {
-			defaultNamespaceValue = valueToStore;
-		} else {
-			namespacedValues.put(currentNamespace, valueToStore);
-		}
+		getCurrentKeyValues().put(currentNamespace, valueToStore);
+	}
+
+	private Map<Object, Object> getCurrentKeyValues() {
+		return values.get();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalMapState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalMapState.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalMapState.java
index 386ef97..9b5ac10 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalMapState.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalMapState.java
@@ -68,21 +68,17 @@ public class MockInternalMapState<K, N, UK, UV>
 
 	@Override
 	public Iterable<Map.Entry<UK, UV>> entries() {
-		return copy().entrySet();
-	}
-
-	private Map<UK, UV> copy() {
-		return new HashMap<>(getInternal());
+		return getInternal().entrySet();
 	}
 
 	@Override
 	public Iterable<UK> keys() {
-		return copy().keySet();
+		return getInternal().keySet();
 	}
 
 	@Override
 	public Iterable<UV> values() {
-		return copy().values();
+		return getInternal().values();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
new file mode 100644
index 0000000..363ecf8
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl.mock;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyExtractorFunction;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.KeyedStateFactory;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.PriorityComparator;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueSet;
+import org.apache.flink.runtime.state.ttl.TtlStateFactory;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.RunnableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** State backend which produces in memory mock state objects. */
+public class MockKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
+
+	@SuppressWarnings("deprecation")
+	private static final Map<Class<? extends StateDescriptor>, KeyedStateFactory> STATE_FACTORIES =
+		Stream.of(
+			Tuple2.of(ValueStateDescriptor.class, (KeyedStateFactory) MockInternalValueState::createState),
+			Tuple2.of(ListStateDescriptor.class, (KeyedStateFactory) MockInternalListState::createState),
+			Tuple2.of(MapStateDescriptor.class, (KeyedStateFactory) MockInternalMapState::createState),
+			Tuple2.of(ReducingStateDescriptor.class, (KeyedStateFactory) MockInternalReducingState::createState),
+			Tuple2.of(AggregatingStateDescriptor.class, (KeyedStateFactory) MockInternalAggregatingState::createState),
+			Tuple2.of(FoldingStateDescriptor.class, (KeyedStateFactory) MockInternalFoldingState::createState)
+		).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+
+	private final Map<String, Map<K, Map<Object, Object>>> stateValues = new HashMap<>();
+
+	MockKeyedStateBackend(
+		TaskKvStateRegistry kvStateRegistry,
+		TypeSerializer<K> keySerializer,
+		ClassLoader userCodeClassLoader,
+		int numberOfKeyGroups,
+		KeyGroupRange keyGroupRange,
+		ExecutionConfig executionConfig,
+		TtlTimeProvider ttlTimeProvider) {
+		super(kvStateRegistry, keySerializer, userCodeClassLoader,
+			numberOfKeyGroups, keyGroupRange, executionConfig, ttlTimeProvider);
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	public <N, SV, S extends State, IS extends S> IS createInternalState(
+		TypeSerializer<N> namespaceSerializer,
+		StateDescriptor<S, SV> stateDesc) throws Exception {
+		KeyedStateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass());
+		if (stateFactory == null) {
+			String message = String.format("State %s is not supported by %s",
+				stateDesc.getClass(), TtlStateFactory.class);
+			throw new FlinkRuntimeException(message);
+		}
+		IS state = stateFactory.createInternalState(namespaceSerializer, stateDesc);
+		((MockInternalKvState<K, N, SV>) state).values = () -> stateValues
+			.computeIfAbsent(stateDesc.getName(), n -> new HashMap<>())
+			.computeIfAbsent(getCurrentKey(), k -> new HashMap<>());
+		return state;
+	}
+
+	@Override
+	public int numStateEntries() {
+		int count = 0;
+		for (String state : stateValues.keySet()) {
+			for (K key : stateValues.get(state).keySet()) {
+				count += stateValues.get(state).get(key).size();
+			}
+		}
+		return count;
+	}
+
+	@Override
+	public void notifyCheckpointComplete(long checkpointId) {
+		// noop
+	}
+
+	@Override
+	public <N> Stream<K> getKeys(String state, N namespace) {
+		return stateValues.get(state).entrySet().stream()
+			.filter(e -> e.getValue().containsKey(namespace))
+			.map(Map.Entry::getKey);
+	}
+
+	@Override
+	public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(
+		long checkpointId,
+		long timestamp,
+		CheckpointStreamFactory streamFactory,
+		CheckpointOptions checkpointOptions) {
+		return new FutureTask<>(() -> SnapshotResult.of(new MockKeyedStateHandle<>(copy(stateValues))));
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public void restore(Collection<KeyedStateHandle> state) {
+		stateValues.clear();
+		state = state == null ? Collections.emptyList() : state;
+		state.forEach(ksh -> stateValues.putAll(copy(((MockKeyedStateHandle<K>) ksh).snapshotStates)));
+	}
+
+	@SuppressWarnings("unchecked")
+	private static <K> Map<String, Map<K, Map<Object, Object>>> copy(
+		Map<String, Map<K, Map<Object, Object>>> stateValues) {
+		Map<String, Map<K, Map<Object, Object>>> snapshotStates = new HashMap<>();
+		for (String stateName : stateValues.keySet()) {
+			Map<K, Map<Object, Object>> keyedValues = snapshotStates.computeIfAbsent(stateName, s -> new HashMap<>());
+			for (K key : stateValues.get(stateName).keySet()) {
+				Map<Object, Object> values = keyedValues.computeIfAbsent(key, s -> new HashMap<>());
+				for (Object namespace : stateValues.get(stateName).get(key).keySet()) {
+					Object value = stateValues.get(stateName).get(key).get(namespace);
+					value = value instanceof List ? new ArrayList<>((List) value) : value;
+					value = value instanceof Map ? new HashMap<>((Map) value) : value;
+					values.put(namespace, value);
+				}
+			}
+		}
+		return snapshotStates;
+	}
+
+	@Nonnull
+	@Override
+	public <T extends HeapPriorityQueueElement> KeyGroupedInternalPriorityQueue<T>
+	create(
+		@Nonnull String stateName,
+		@Nonnull TypeSerializer<T> byteOrderedElementSerializer,
+		@Nonnull PriorityComparator<T> elementPriorityComparator,
+		@Nonnull KeyExtractorFunction<T> keyExtractor) {
+		return new HeapPriorityQueueSet<>(
+			elementPriorityComparator,
+			keyExtractor,
+			0,
+			keyGroupRange,
+			0);
+	}
+
+	private static class MockKeyedStateHandle<K> implements KeyedStateHandle {
+		private static final long serialVersionUID = 1L;
+
+		final Map<String, Map<K, Map<Object, Object>>> snapshotStates;
+
+		MockKeyedStateHandle(Map<String, Map<K, Map<Object, Object>>> snapshotStates) {
+			this.snapshotStates = snapshotStates;
+		}
+
+		@Override
+		public void discardState() {
+			snapshotStates.clear();
+		}
+
+		@Override
+		public long getStateSize() {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public void registerSharedStates(SharedStateRegistry stateRegistry) {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public KeyGroupRange getKeyGroupRange() {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
+			throw new UnsupportedOperationException();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateFactory.java
deleted file mode 100644
index d843352..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateFactory.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.ttl.mock;
-
-import org.apache.flink.api.common.state.AggregatingStateDescriptor;
-import org.apache.flink.api.common.state.FoldingStateDescriptor;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.state.ReducingStateDescriptor;
-import org.apache.flink.api.common.state.State;
-import org.apache.flink.api.common.state.StateDescriptor;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.state.KeyedStateFactory;
-import org.apache.flink.runtime.state.ttl.TtlStateFactory;
-import org.apache.flink.util.FlinkRuntimeException;
-
-import java.util.Map;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-/** State factory which produces in memory mock state objects. */
-public class MockKeyedStateFactory implements KeyedStateFactory {
-	@SuppressWarnings("deprecation")
-	private static final Map<Class<? extends StateDescriptor>, KeyedStateFactory> STATE_FACTORIES =
-		Stream.of(
-			Tuple2.of(ValueStateDescriptor.class, (KeyedStateFactory) MockInternalValueState::createState),
-			Tuple2.of(ListStateDescriptor.class, (KeyedStateFactory) MockInternalListState::createState),
-			Tuple2.of(MapStateDescriptor.class, (KeyedStateFactory) MockInternalMapState::createState),
-			Tuple2.of(ReducingStateDescriptor.class, (KeyedStateFactory) MockInternalReducingState::createState),
-			Tuple2.of(AggregatingStateDescriptor.class, (KeyedStateFactory) MockInternalAggregatingState::createState),
-			Tuple2.of(FoldingStateDescriptor.class, (KeyedStateFactory) MockInternalFoldingState::createState)
-		).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
-
-	@Override
-	public <N, SV, S extends State, IS extends S> IS createState(
-		TypeSerializer<N> namespaceSerializer,
-		StateDescriptor<S, SV> stateDesc) throws Exception {
-		KeyedStateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass());
-		if (stateFactory == null) {
-			String message = String.format("State %s is not supported by %s",
-				stateDesc.getClass(), TtlStateFactory.class);
-			throw new FlinkRuntimeException(message);
-		}
-		return stateFactory.createState(namespaceSerializer, stateDesc);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockStateBackend.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockStateBackend.java
new file mode 100644
index 0000000..a8d49dd
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockStateBackend.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl.mock;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.CheckpointStorageLocation;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+
+import javax.annotation.Nullable;
+
+/** mack state backend. */
+public class MockStateBackend extends AbstractStateBackend {
+	@Override
+	public CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public CheckpointStorage createCheckpointStorage(JobID jobId) {
+		return new CheckpointStorage() {
+			@Override
+			public boolean supportsHighlyAvailableStorage() {
+				return false;
+			}
+
+			@Override
+			public boolean hasDefaultSavepointLocation() {
+				return false;
+			}
+
+			@Override
+			public CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer) {
+				return null;
+			}
+
+			@Override
+			public CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId) {
+				return null;
+			}
+
+			@Override
+			public CheckpointStorageLocation initializeLocationForSavepoint(long checkpointId, @Nullable String externalLocationPointer) {
+				return null;
+			}
+
+			@Override
+			public CheckpointStreamFactory resolveCheckpointStorageLocation(long checkpointId, CheckpointStorageLocationReference reference) {
+				return null;
+			}
+
+			@Override
+			public CheckpointStreamFactory.CheckpointStateOutputStream createTaskOwnedStateStream() {
+				return null;
+			}
+		};
+	}
+
+	@Override
+	public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
+		Environment env,
+		JobID jobID,
+		String operatorIdentifier,
+		TypeSerializer<K> keySerializer,
+		int numberOfKeyGroups,
+		KeyGroupRange keyGroupRange,
+		TaskKvStateRegistry kvStateRegistry,
+		TtlTimeProvider ttlTimeProvider) {
+		return new MockKeyedStateBackend<>(
+			new KvStateRegistry().createTaskRegistry(jobID, new JobVertexID()),
+			keySerializer,
+			env.getUserClassLoader(),
+			numberOfKeyGroups,
+			keyGroupRange,
+			env.getExecutionConfig(),
+			ttlTimeProvider);
+	}
+
+	@Override
+	public OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier) {
+		throw new UnsupportedOperationException();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 6b4e3e9..3bf0aea 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -87,6 +87,7 @@ import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
 import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
 import org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue;
 import org.apache.flink.runtime.state.heap.TreeOrderedSetCache;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FileUtils;
@@ -270,10 +271,12 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		ExecutionConfig executionConfig,
 		boolean enableIncrementalCheckpointing,
 		LocalRecoveryConfig localRecoveryConfig,
-		RocksDBStateBackend.PriorityQueueStateType priorityQueueStateType
+		RocksDBStateBackend.PriorityQueueStateType priorityQueueStateType,
+		TtlTimeProvider ttlTimeProvider
 	) throws IOException {
 
-		super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange, executionConfig);
+		super(kvStateRegistry, keySerializer, userCodeClassLoader,
+			numberOfKeyGroups, keyGroupRange, executionConfig, ttlTimeProvider);
 
 		this.operatorIdentifier = Preconditions.checkNotNull(operatorIdentifier);
 
@@ -1347,7 +1350,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	}
 
 	@Override
-	public <N, SV, S extends State, IS extends S> IS createState(
+	public <N, SV, S extends State, IS extends S> IS createInternalState(
 		TypeSerializer<N> namespaceSerializer,
 		StateDescriptor<S, SV> stateDesc) throws Exception {
 		StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass());

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 998521b..1794e17 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.state.LocalRecoveryConfig;
 import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.util.AbstractID;
 import org.apache.flink.util.TernaryBoolean;
 
@@ -410,7 +411,8 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
 			TypeSerializer<K> keySerializer,
 			int numberOfKeyGroups,
 			KeyGroupRange keyGroupRange,
-			TaskKvStateRegistry kvStateRegistry) throws IOException {
+			TaskKvStateRegistry kvStateRegistry,
+			TtlTimeProvider ttlTimeProvider) throws IOException {
 
 		// first, make sure that the RocksDB JNI library is loaded
 		// we do this explicitly here to have better error handling
@@ -442,7 +444,8 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
 				env.getExecutionConfig(),
 				isIncrementalCheckpointsEnabled(),
 				localRecoveryConfig,
-				priorityQueueStateType);
+				priorityQueueStateType,
+				ttlTimeProvider);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index 69069d6..6b254ce 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
 import org.apache.flink.runtime.util.BlockingCheckpointOutputStream;
 
@@ -241,7 +242,8 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
 				new ExecutionConfig(),
 				enableIncrementalCheckpointing,
 				TestLocalRecoveryConfig.disabled(),
-				RocksDBStateBackend.PriorityQueueStateType.HEAP);
+				RocksDBStateBackend.PriorityQueueStateType.HEAP,
+				TtlTimeProvider.DEFAULT);
 
 			verify(columnFamilyOptions, Mockito.times(1))
 				.setMergeOperatorName(RocksDBKeyedStateBackend.MERGE_OPERATOR_NAME);