You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/07/20 08:26:58 UTC

flink git commit: [FLINK-9858][tests] State TTL End-to-End Test

Repository: flink
Updated Branches:
  refs/heads/master 46334e2f3 -> 01cf808ee


[FLINK-9858][tests] State TTL End-to-End Test

This closes #6361.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/01cf808e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/01cf808e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/01cf808e

Branch: refs/heads/master
Commit: 01cf808ee863f4b0f429f20ba05fd2e322842e48
Parents: 46334e2
Author: Andrey Zagrebin <az...@gmail.com>
Authored: Fri Jul 13 19:27:35 2018 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Fri Jul 20 10:26:28 2018 +0200

----------------------------------------------------------------------
 .../flink-stream-state-ttl-test/pom.xml         | 104 +++++++++++
 .../tests/DataStreamStateTTLTestProgram.java    | 100 ++++++++++
 .../flink/streaming/tests/TtlStateUpdate.java   |  45 +++++
 .../streaming/tests/TtlStateUpdateSource.java   |  78 ++++++++
 .../tests/TtlVerifyUpdateFunction.java          | 187 +++++++++++++++++++
 .../tests/verify/AbstractTtlStateVerifier.java  | 105 +++++++++++
 .../verify/TtlAggregatingStateVerifier.java     | 107 +++++++++++
 .../tests/verify/TtlFoldingStateVerifier.java   |  87 +++++++++
 .../tests/verify/TtlListStateVerifier.java      |  78 ++++++++
 .../tests/verify/TtlMapStateVerifier.java       |  94 ++++++++++
 .../tests/verify/TtlReducingStateVerifier.java  |  86 +++++++++
 .../tests/verify/TtlStateVerifier.java          |  60 ++++++
 .../tests/verify/TtlUpdateContext.java          |  71 +++++++
 .../tests/verify/TtlValueStateVerifier.java     |  66 +++++++
 .../tests/verify/TtlVerificationContext.java    |  69 +++++++
 .../streaming/tests/verify/ValueWithTs.java     | 107 +++++++++++
 flink-end-to-end-tests/pom.xml                  |   1 +
 flink-end-to-end-tests/run-nightly-tests.sh     |   3 +
 flink-end-to-end-tests/test-scripts/common.sh   |  37 ++--
 .../test-scripts/test_stream_state_ttl.sh       |  83 ++++++++
 20 files changed, 1555 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/01cf808e/flink-end-to-end-tests/flink-stream-state-ttl-test/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/pom.xml b/flink-end-to-end-tests/flink-stream-state-ttl-test/pom.xml
new file mode 100644
index 0000000..ad04f3e
--- /dev/null
+++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/pom.xml
@@ -0,0 +1,104 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-end-to-end-tests</artifactId>
+		<version>1.7-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-stream-state-ttl-test</artifactId>
+	<name>flink-stream-state-ttl-test</name>
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-datastream-allround-test</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<version>3.0.0</version>
+				<executions>
+					<execution>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<finalName>DataStreamStateTTLTestProgram</finalName>
+							<artifactSet>
+								<excludes>
+									<exclude>com.google.code.findbugs:jsr305</exclude>
+									<exclude>org.slf4j:*</exclude>
+									<exclude>log4j:*</exclude>
+								</excludes>
+							</artifactSet>
+							<filters>
+								<filter>
+									<artifact>*:*</artifact>
+									<excludes>
+										<exclude>META-INF/*.SF</exclude>
+										<exclude>META-INF/*.DSA</exclude>
+										<exclude>META-INF/*.RSA</exclude>
+									</excludes>
+								</filter>
+							</filters>
+							<transformers>
+								<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+									<mainClass>org.apache.flink.streaming.tests.DataStreamStateTTLTestProgram</mainClass>
+								</transformer>
+							</transformers>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/01cf808e/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/DataStreamStateTTLTestProgram.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/DataStreamStateTTLTestProgram.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/DataStreamStateTTLTestProgram.java
new file mode 100644
index 0000000..f4c9619
--- /dev/null
+++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/DataStreamStateTTLTestProgram.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.state.StateTtlConfiguration;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+
+import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.setupEnvironment;
+
+/**
+ * A test job for State TTL feature.
+ *
+ * <p>The test pipeline does the following:
+ * - generates random keyed state updates for each state TTL verifier (state type)
+ * - performs update of created state with TTL for each verifier
+ * - keeps previous updates in other state
+ * - verifies expected result of last update against preserved history of updates
+ *
+ * <p>Program parameters:
+ * <ul>
+ *     <li>update_generator_source.keyspace (int, default - 100): Number of different keys for updates emitted by the update generator.</li>
+ *     <li>update_generator_source.sleep_time (long, default - 0): Milliseconds to sleep after emitting updates in the update generator. Set to 0 to disable sleeping.</li>
+ *     <li>update_generator_source.sleep_after_elements (long, default - 0): Number of updates to emit before sleeping in the update generator. Set to 0 to disable sleeping.</li>
+ *     <li>state_ttl_verifier.ttl_milli (long, default - 1000): State time-to-live.</li>
+ *     <li>report_stat.after_updates_num (long, default - 200): Report state update statistics after certain number of updates (average update chain length and clashes).</li>
+ * </ul>
+ */
+public class DataStreamStateTTLTestProgram {
+	private static final ConfigOption<Integer> UPDATE_GENERATOR_SRC_KEYSPACE = ConfigOptions
+		.key("update_generator_source.keyspace")
+		.defaultValue(100);
+
+	private static final ConfigOption<Long> UPDATE_GENERATOR_SRC_SLEEP_TIME = ConfigOptions
+		.key("update_generator_source.sleep_time")
+		.defaultValue(0L);
+
+	private static final ConfigOption<Long> UPDATE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS = ConfigOptions
+		.key("update_generator_source.sleep_after_elements")
+		.defaultValue(0L);
+
+	private static final ConfigOption<Long> STATE_TTL_VERIFIER_TTL_MILLI = ConfigOptions
+		.key("state_ttl_verifier.ttl_milli")
+		.defaultValue(1000L);
+
+	private static final ConfigOption<Long> REPORT_STAT_AFTER_UPDATES_NUM = ConfigOptions
+		.key("report_stat.after_updates_num")
+		.defaultValue(200L);
+
+	public static void main(String[] args) throws Exception {
+		final ParameterTool pt = ParameterTool.fromArgs(args);
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		setupEnvironment(env, pt);
+
+		int keySpace = pt.getInt(UPDATE_GENERATOR_SRC_KEYSPACE.key(), UPDATE_GENERATOR_SRC_KEYSPACE.defaultValue());
+		long sleepAfterElements = pt.getLong(UPDATE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS.key(),
+			UPDATE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS.defaultValue());
+		long sleepTime = pt.getLong(UPDATE_GENERATOR_SRC_SLEEP_TIME.key(),
+			UPDATE_GENERATOR_SRC_SLEEP_TIME.defaultValue());
+		Time ttl = Time.milliseconds(pt.getLong(STATE_TTL_VERIFIER_TTL_MILLI.key(),
+			STATE_TTL_VERIFIER_TTL_MILLI.defaultValue()));
+		long reportStatAfterUpdatesNum = pt.getLong(REPORT_STAT_AFTER_UPDATES_NUM.key(),
+			REPORT_STAT_AFTER_UPDATES_NUM.defaultValue());
+
+		StateTtlConfiguration ttlConfig = StateTtlConfiguration.newBuilder(ttl).build();
+
+		env
+			.addSource(new TtlStateUpdateSource(keySpace, sleepAfterElements, sleepTime))
+			.name("TtlStateUpdateSource")
+			.keyBy(TtlStateUpdate::getKey)
+			.flatMap(new TtlVerifyUpdateFunction(ttlConfig, reportStatAfterUpdatesNum))
+			.name("TtlVerifyUpdateFunction")
+			.addSink(new PrintSinkFunction<>())
+			.name("PrintFailedVerifications");
+
+		env.execute("State TTL test job");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/01cf808e/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlStateUpdate.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlStateUpdate.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlStateUpdate.java
new file mode 100644
index 0000000..e89b544
--- /dev/null
+++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlStateUpdate.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/** Randomly generated keyed state updates per state type. */
+class TtlStateUpdate implements Serializable {
+	private final int key;
+
+	@Nonnull
+	private final Map<String, Object> updates;
+
+	TtlStateUpdate(int key, @Nonnull Map<String, Object> updates) {
+		this.key = key;
+		this.updates = updates;
+	}
+
+	int getKey() {
+		return key;
+	}
+
+	Object getUpdate(String verifierId) {
+		return updates.get(verifierId);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/01cf808e/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlStateUpdateSource.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlStateUpdateSource.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlStateUpdateSource.java
new file mode 100644
index 0000000..6aff14e
--- /dev/null
+++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlStateUpdateSource.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.tests.verify.TtlStateVerifier;
+
+import java.util.Map;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+/**
+ * Source of randomly generated keyed state updates.
+ *
+ * <p>Internal loop generates {@code sleepAfterElements} state updates
+ * for each verifier from {@link TtlStateVerifier#VERIFIERS} using {@link TtlStateVerifier#generateRandomUpdate}
+ * and waits for {@code sleepTime} to continue generation.
+ */
+class TtlStateUpdateSource extends RichParallelSourceFunction<TtlStateUpdate> {
+	private final int maxKey;
+	private final long sleepAfterElements;
+	private final long sleepTime;
+
+	/** Flag that determines if this source is running, i.e. generating events. */
+	private volatile boolean running = true;
+
+	TtlStateUpdateSource(int maxKey, long sleepAfterElements, long sleepTime) {
+		this.maxKey = maxKey;
+		this.sleepAfterElements = sleepAfterElements;
+		this.sleepTime = sleepTime;
+	}
+
+	@Override
+	public void run(SourceContext<TtlStateUpdate> ctx) throws Exception {
+		Random random = new Random();
+		long elementsBeforeSleep = sleepAfterElements;
+		while (running) {
+			for (int i = 0; i < sleepAfterElements; i++) {
+				synchronized (ctx.getCheckpointLock()) {
+					Map<String, Object> updates = TtlStateVerifier.VERIFIERS.stream()
+						.collect(Collectors.toMap(TtlStateVerifier::getId, TtlStateVerifier::generateRandomUpdate));
+					ctx.collect(new TtlStateUpdate(random.nextInt(maxKey), updates));
+				}
+			}
+
+			if (sleepTime > 0) {
+				if (elementsBeforeSleep == 1) {
+					elementsBeforeSleep = sleepAfterElements;
+					long rnd = sleepTime < Integer.MAX_VALUE ? random.nextInt((int) sleepTime) : 0L;
+					Thread.sleep(rnd + sleepTime);
+				} else if (elementsBeforeSleep > 1) {
+					--elementsBeforeSleep;
+				}
+			}
+		}
+	}
+
+	@Override
+	public void cancel() {
+		running = false;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/01cf808e/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java
new file mode 100644
index 0000000..a99a45f
--- /dev/null
+++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateTtlConfiguration;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.tests.verify.TtlStateVerifier;
+import org.apache.flink.streaming.tests.verify.TtlUpdateContext;
+import org.apache.flink.streaming.tests.verify.TtlVerificationContext;
+import org.apache.flink.streaming.tests.verify.ValueWithTs;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * Update state with TTL for each verifier.
+ *
+ * <p>This function for each verifier from {@link TtlStateVerifier#VERIFIERS}
+ * - creates state with TTL
+ * - creates state of previous updates for further verification against it
+ * - receives random state update
+ * - gets state value before update
+ * - updates state with random value
+ * - gets state value after update
+ * - checks if this update clashes with any previous updates
+ * - if clashes, clears state and recreate update
+ * - verifies last update against previous updates
+ * - emits verification context in case of failure
+ */
+class TtlVerifyUpdateFunction
+	extends RichFlatMapFunction<TtlStateUpdate, String> implements CheckpointedFunction {
+	private static final Logger LOG = LoggerFactory.getLogger(TtlVerifyUpdateFunction.class);
+
+	@Nonnull
+	private final StateTtlConfiguration ttlConfig;
+	private final long ttl;
+	private final UpdateStat stat;
+
+	private transient Map<String, State> states;
+	private transient Map<String, ListState<ValueWithTs<?>>> prevUpdatesByVerifierId;
+
+	TtlVerifyUpdateFunction(@Nonnull StateTtlConfiguration ttlConfig, long reportStatAfterUpdatesNum) {
+		this.ttlConfig = ttlConfig;
+		this.ttl = ttlConfig.getTtl().toMilliseconds();
+		this.stat = new UpdateStat(reportStatAfterUpdatesNum);
+	}
+
+	@Override
+	public void flatMap(TtlStateUpdate updates, Collector<String> out) throws Exception {
+		for (TtlStateVerifier<?, ?> verifier : TtlStateVerifier.VERIFIERS) {
+			TtlVerificationContext<?, ?> verificationContext = generateUpdateAndVerificationContext(updates, verifier);
+			if (!verifier.verify(verificationContext)) {
+				out.collect(verificationContext.toString());
+			}
+		}
+	}
+
+	private TtlVerificationContext<?, ?> generateUpdateAndVerificationContext(
+		TtlStateUpdate updates, TtlStateVerifier<?, ?> verifier) throws Exception {
+		List<ValueWithTs<?>> prevUpdates = getPrevUpdates(verifier.getId());
+		Object update = updates.getUpdate(verifier.getId());
+		TtlUpdateContext<?, ?> updateContext = performUpdate(verifier, update);
+		boolean clashes = updateClashesWithPrevUpdates(updateContext.getUpdateWithTs(), prevUpdates);
+		if (clashes) {
+			resetState(verifier.getId());
+			prevUpdates = Collections.emptyList();
+			updateContext = performUpdate(verifier, update);
+		}
+		stat.update(clashes, prevUpdates.size());
+		prevUpdatesByVerifierId.get(verifier.getId()).add(updateContext.getUpdateWithTs());
+		return new TtlVerificationContext<>(updates.getKey(), verifier.getId(), prevUpdates, updateContext);
+	}
+
+	private List<ValueWithTs<?>> getPrevUpdates(String verifierId) throws Exception {
+		return StreamSupport
+			.stream(prevUpdatesByVerifierId.get(verifierId).get().spliterator(), false)
+			.collect(Collectors.toList());
+	}
+
+	private TtlUpdateContext<?, ?> performUpdate(
+		TtlStateVerifier<?, ?> verifier, Object update) throws Exception {
+		State state = states.get(verifier.getId());
+		long timestampBeforeUpdate = System.currentTimeMillis();
+		Object valueBeforeUpdate = verifier.get(state);
+		verifier.update(state, update);
+		Object updatedValue = verifier.get(state);
+		return new TtlUpdateContext<>(timestampBeforeUpdate,
+			valueBeforeUpdate, update, updatedValue, System.currentTimeMillis());
+	}
+
+	private boolean updateClashesWithPrevUpdates(ValueWithTs<?> update, List<ValueWithTs<?>> prevUpdates) {
+		return tooSlow(update) ||
+			(!prevUpdates.isEmpty() && prevUpdates.stream().anyMatch(pu -> updatesClash(pu, update)));
+	}
+
+	private boolean tooSlow(ValueWithTs<?> update) {
+		return update.getTimestampAfterUpdate() - update.getTimestampBeforeUpdate() >= ttl;
+	}
+
+	private boolean updatesClash(ValueWithTs<?> prevUpdate, ValueWithTs<?> nextUpdate) {
+		return prevUpdate.getTimestampAfterUpdate() + ttl >= nextUpdate.getTimestampBeforeUpdate() &&
+			prevUpdate.getTimestampBeforeUpdate() + ttl <= nextUpdate.getTimestampAfterUpdate();
+	}
+
+	private void resetState(String verifierId) {
+		states.get(verifierId).clear();
+		prevUpdatesByVerifierId.get(verifierId).clear();
+	}
+
+	@Override
+	public void snapshotState(FunctionSnapshotContext context) {
+
+	}
+
+	@Override
+	public void initializeState(FunctionInitializationContext context) {
+		states = TtlStateVerifier.VERIFIERS.stream()
+			.collect(Collectors.toMap(TtlStateVerifier::getId, v -> v.createState(context, ttlConfig)));
+		prevUpdatesByVerifierId = TtlStateVerifier.VERIFIERS.stream()
+			.collect(Collectors.toMap(TtlStateVerifier::getId, v -> {
+				Preconditions.checkNotNull(v);
+				TypeSerializer<ValueWithTs<?>> typeSerializer = new ValueWithTs.Serializer(v.getUpdateSerializer());
+				ListStateDescriptor<ValueWithTs<?>> stateDesc = new ListStateDescriptor<>(
+					"TtlPrevValueState_" + v.getId(), typeSerializer);
+				KeyedStateStore store = context.getKeyedStateStore();
+				return store.getListState(stateDesc);
+			}));
+	}
+
+	private static class UpdateStat implements Serializable {
+		final long reportStatAfterUpdatesNum;
+		long updates = 0;
+		long clashes = 0;
+		long prevUpdatesNum = 0;
+
+		UpdateStat(long reportStatAfterUpdatesNum) {
+			this.reportStatAfterUpdatesNum = reportStatAfterUpdatesNum;
+		}
+
+		void update(boolean clash, long prevUpdatesSize) {
+			updates++;
+			if (clash) {
+				clashes++;
+			}
+			prevUpdatesNum += prevUpdatesSize;
+			if (updates % reportStatAfterUpdatesNum == 0) {
+				LOG.info(String.format("Avg update chain length: %d, clash stat: %d/%d",
+					prevUpdatesNum / updates, clashes, updates));
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/01cf808e/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/AbstractTtlStateVerifier.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/AbstractTtlStateVerifier.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/AbstractTtlStateVerifier.java
new file mode 100644
index 0000000..c56ff19
--- /dev/null
+++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/AbstractTtlStateVerifier.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.verify;
+
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.StateTtlConfiguration;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.util.StringUtils;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+
+/** Base class for State TTL verifiers. */
+abstract class AbstractTtlStateVerifier<D extends StateDescriptor<S, SV>, S extends State, SV, UV, GV>
+	implements TtlStateVerifier<UV, GV> {
+	static final Random RANDOM = new Random();
+
+	@Nonnull
+	final D stateDesc;
+
+	AbstractTtlStateVerifier(@Nonnull D stateDesc) {
+		this.stateDesc = stateDesc;
+	}
+
+	@Nonnull
+	static String randomString() {
+		return StringUtils.getRandomString(RANDOM, 2, 20);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	@Nonnull
+	public State createState(@Nonnull FunctionInitializationContext context, @Nonnull StateTtlConfiguration ttlConfig) {
+		stateDesc.enableTimeToLive(ttlConfig);
+		return createState(context);
+	}
+
+	abstract State createState(FunctionInitializationContext context);
+
+	@SuppressWarnings("unchecked")
+	@Override
+	@Nonnull
+	public TypeSerializer<UV> getUpdateSerializer() {
+		return (TypeSerializer<UV>) stateDesc.getSerializer();
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public GV get(@Nonnull State state) throws Exception {
+		return getInternal((S) state);
+	}
+
+	abstract GV getInternal(@Nonnull S state) throws Exception;
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public void update(@Nonnull State state, Object update) throws Exception {
+		updateInternal((S) state, (UV) update);
+	}
+
+	abstract void updateInternal(@Nonnull S state, UV update) throws Exception;
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public boolean verify(@Nonnull TtlVerificationContext<?, ?> verificationContextRaw) {
+		TtlVerificationContext<UV, GV> verificationContext = (TtlVerificationContext<UV, GV>) verificationContextRaw;
+		List<ValueWithTs<UV>> updates = new ArrayList<>(verificationContext.getPrevUpdates());
+		long currentTimestamp = verificationContext.getUpdateContext().getTimestampBeforeUpdate();
+		GV prevValue = expected(updates, currentTimestamp);
+		GV valueBeforeUpdate = verificationContext.getUpdateContext().getValueBeforeUpdate();
+		ValueWithTs<UV> update = verificationContext.getUpdateContext().getUpdateWithTs();
+		GV updatedValue = verificationContext.getUpdateContext().getUpdatedValue();
+		updates.add(update);
+		GV expectedValue = expected(updates, currentTimestamp);
+		return Objects.equals(valueBeforeUpdate, prevValue) && Objects.equals(updatedValue, expectedValue);
+	}
+
+	abstract GV expected(@Nonnull List<ValueWithTs<UV>> updates, long currentTimestamp);
+
+	boolean expired(long lastTimestamp, long currentTimestamp) {
+		return lastTimestamp + stateDesc.getTtlConfig().getTtl().toMilliseconds() <= currentTimestamp;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/01cf808e/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlAggregatingStateVerifier.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlAggregatingStateVerifier.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlAggregatingStateVerifier.java
new file mode 100644
index 0000000..960bbe7
--- /dev/null
+++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlAggregatingStateVerifier.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.verify;
+
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.state.AggregatingState;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+
+import javax.annotation.Nonnull;
+
+import java.util.List;
+
+class TtlAggregatingStateVerifier extends AbstractTtlStateVerifier<
+	AggregatingStateDescriptor<Integer, Long, String>, AggregatingState<Integer, String>, Long, Integer, String> {
+	TtlAggregatingStateVerifier() {
+		super(new AggregatingStateDescriptor<>("TtlAggregatingStateVerifier", AGG_FUNC, LongSerializer.INSTANCE));
+	}
+
+	@Override
+	@Nonnull
+	State createState(@Nonnull FunctionInitializationContext context) {
+		return context.getKeyedStateStore().getAggregatingState(stateDesc);
+	}
+
+	@Override
+	@Nonnull
+	public TypeSerializer<Integer> getUpdateSerializer() {
+		return IntSerializer.INSTANCE;
+	}
+
+	@Override
+	@Nonnull
+	public Integer generateRandomUpdate() {
+		return RANDOM.nextInt(100);
+	}
+
+	@Override
+	String getInternal(@Nonnull AggregatingState<Integer, String> state) throws Exception {
+		return state.get();
+	}
+
+	@Override
+	void updateInternal(@Nonnull AggregatingState<Integer, String> state, Integer update) throws Exception {
+		state.add(update);
+	}
+
+	@Override
+	String expected(@Nonnull List<ValueWithTs<Integer>> updates, long currentTimestamp) {
+		if (updates.isEmpty()) {
+			return null;
+		}
+		long acc = AGG_FUNC.createAccumulator();
+		long lastTs = updates.get(0).getTimestampAfterUpdate();
+		for (ValueWithTs<Integer> update : updates) {
+			if (expired(lastTs, update.getTimestampAfterUpdate())) {
+				acc = AGG_FUNC.createAccumulator();
+			}
+			acc = AGG_FUNC.add(update.getValue(), acc);
+			lastTs = update.getTimestampAfterUpdate();
+		}
+		return expired(lastTs, currentTimestamp) ? null : AGG_FUNC.getResult(acc);
+	}
+
+	private static final AggregateFunction<Integer, Long, String> AGG_FUNC =
+		new AggregateFunction<Integer, Long, String>() {
+			@Override
+			public Long createAccumulator() {
+				return 3L;
+			}
+
+			@Override
+			public Long add(Integer value, Long accumulator) {
+				return accumulator + value;
+			}
+
+			@Override
+			public String getResult(Long accumulator) {
+				return Long.toString(accumulator);
+			}
+
+			@Override
+			public Long merge(Long a, Long b) {
+				return a + b;
+			}
+		};
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/01cf808e/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlFoldingStateVerifier.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlFoldingStateVerifier.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlFoldingStateVerifier.java
new file mode 100644
index 0000000..c1cc761
--- /dev/null
+++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlFoldingStateVerifier.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.verify;
+
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+
+import javax.annotation.Nonnull;
+
+import java.util.List;
+
+@SuppressWarnings("deprecation")
+class TtlFoldingStateVerifier extends AbstractTtlStateVerifier<
+	FoldingStateDescriptor<Integer, Long>, FoldingState<Integer, Long>, Long, Integer, Long> {
+	private static final long INIT_VAL = 5L;
+
+	TtlFoldingStateVerifier() {
+		super(new FoldingStateDescriptor<>(
+			"TtlFoldingStateVerifier", INIT_VAL, (v, acc) -> acc + v, LongSerializer.INSTANCE));
+	}
+
+	@Override
+	@Nonnull
+	State createState(@Nonnull FunctionInitializationContext context) {
+		return context.getKeyedStateStore().getFoldingState(stateDesc);
+	}
+
+	@Override
+	@Nonnull
+	public TypeSerializer<Integer> getUpdateSerializer() {
+		return IntSerializer.INSTANCE;
+	}
+
+	@Override
+	@Nonnull
+	public Integer generateRandomUpdate() {
+		return RANDOM.nextInt(100);
+	}
+
+	@Override
+	Long getInternal(@Nonnull FoldingState<Integer, Long> state) throws Exception {
+		return state.get();
+	}
+
+	@Override
+	void updateInternal(@Nonnull FoldingState<Integer, Long> state, Integer update) throws Exception {
+		state.add(update);
+	}
+
+	@Override
+	Long expected(@Nonnull List<ValueWithTs<Integer>> updates, long currentTimestamp) {
+		if (updates.isEmpty()) {
+			return null;
+		}
+		long acc = INIT_VAL;
+		long lastTs = updates.get(0).getTimestampAfterUpdate();
+		for (ValueWithTs<Integer> update : updates) {
+			if (expired(lastTs, update.getTimestampAfterUpdate())) {
+				acc = INIT_VAL;
+			}
+			acc += update.getValue();
+			lastTs = update.getTimestampAfterUpdate();
+		}
+		return expired(lastTs, currentTimestamp) ? null : acc;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/01cf808e/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlListStateVerifier.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlListStateVerifier.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlListStateVerifier.java
new file mode 100644
index 0000000..b355aa9
--- /dev/null
+++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlListStateVerifier.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.verify;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+
+import javax.annotation.Nonnull;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+class TtlListStateVerifier extends AbstractTtlStateVerifier<
+	ListStateDescriptor<String>, ListState<String>, List<String>, String, List<String>> {
+	TtlListStateVerifier() {
+		super(new ListStateDescriptor<>("TtlListStateVerifier", StringSerializer.INSTANCE));
+	}
+
+	@Override
+	@Nonnull
+	State createState(@Nonnull FunctionInitializationContext context) {
+		return context.getKeyedStateStore().getListState(stateDesc);
+	}
+
+	@Override
+	@Nonnull
+	public TypeSerializer<String> getUpdateSerializer() {
+		return StringSerializer.INSTANCE;
+	}
+
+	@Override
+	@Nonnull
+	public String generateRandomUpdate() {
+		return randomString();
+	}
+
+	@Override
+	@Nonnull
+	List<String> getInternal(@Nonnull ListState<String> state) throws Exception {
+		return StreamSupport.stream(state.get().spliterator(), false)
+			.collect(Collectors.toList());
+	}
+
+	@Override
+	void updateInternal(@Nonnull ListState<String> state, String update) throws Exception {
+		state.add(update);
+	}
+
+	@Override
+	@Nonnull
+	List<String> expected(@Nonnull List<ValueWithTs<String>> updates, long currentTimestamp) {
+		return updates.stream()
+			.filter(u -> !expired(u.getTimestampAfterUpdate(), currentTimestamp))
+			.map(ValueWithTs::getValue)
+			.collect(Collectors.toList());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/01cf808e/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlMapStateVerifier.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlMapStateVerifier.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlMapStateVerifier.java
new file mode 100644
index 0000000..a9d6b36
--- /dev/null
+++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlMapStateVerifier.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.verify;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.StreamSupport;
+
+class TtlMapStateVerifier extends AbstractTtlStateVerifier<
+	MapStateDescriptor<String, String>, MapState<String, String>,
+	Map<String, String>, Tuple2<String, String>, Map<String, String>> {
+	private static final List<String> KEYS = new ArrayList<>();
+	static {
+		IntStream.range(0, RANDOM.nextInt(5) + 5).forEach(i -> KEYS.add(randomString()));
+	}
+
+	TtlMapStateVerifier() {
+		super(new MapStateDescriptor<>("TtlMapStateVerifier", StringSerializer.INSTANCE, StringSerializer.INSTANCE));
+	}
+
+	@Override
+	@Nonnull
+	State createState(@Nonnull FunctionInitializationContext context) {
+		return context.getKeyedStateStore().getMapState(stateDesc);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	@Nonnull
+	public TypeSerializer<Tuple2<String, String>> getUpdateSerializer() {
+		return new TupleSerializer(
+			Tuple2.class, new TypeSerializer[] {StringSerializer.INSTANCE, StringSerializer.INSTANCE});
+	}
+
+	@Override
+	@Nonnull
+	public Tuple2<String, String> generateRandomUpdate() {
+		return Tuple2.of(KEYS.get(RANDOM.nextInt(KEYS.size())), randomString());
+	}
+
+	@Override
+	@Nonnull
+	Map<String, String> getInternal(@Nonnull MapState<String, String> state) throws Exception {
+		return StreamSupport.stream(state.entries().spliterator(), false)
+			.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+	}
+
+	@Override
+	void updateInternal(@Nonnull MapState<String, String> state, Tuple2<String, String> update) throws Exception {
+		state.put(update.f0, update.f1);
+	}
+
+	@Override
+	@Nonnull
+	Map<String, String> expected(@Nonnull List<ValueWithTs<Tuple2<String, String>>> updates, long currentTimestamp) {
+		return updates.stream()
+			.collect(Collectors.groupingBy(u -> u.getValue().f0))
+			.entrySet().stream()
+			.map(e -> e.getValue().get(e.getValue().size() - 1))
+			.filter(u -> !expired(u.getTimestampAfterUpdate(), currentTimestamp))
+			.map(ValueWithTs::getValue)
+			.collect(Collectors.toMap(u -> u.f0, u -> u.f1));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/01cf808e/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlReducingStateVerifier.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlReducingStateVerifier.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlReducingStateVerifier.java
new file mode 100644
index 0000000..773be05
--- /dev/null
+++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlReducingStateVerifier.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.verify;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+
+import javax.annotation.Nonnull;
+
+import java.util.List;
+
+class TtlReducingStateVerifier extends AbstractTtlStateVerifier<
+	ReducingStateDescriptor<Integer>, ReducingState<Integer>, Integer, Integer, Integer> {
+	TtlReducingStateVerifier() {
+		super(new ReducingStateDescriptor<>(
+			"TtlReducingStateVerifier",
+			(ReduceFunction<Integer>) (value1, value2) -> value1 + value2,
+			IntSerializer.INSTANCE));
+	}
+
+	@Override
+	@Nonnull
+	State createState(@Nonnull FunctionInitializationContext context) {
+		return context.getKeyedStateStore().getReducingState(stateDesc);
+	}
+
+	@Override
+	@Nonnull
+	public TypeSerializer<Integer> getUpdateSerializer() {
+		return IntSerializer.INSTANCE;
+	}
+
+	@Override
+	@Nonnull
+	public Integer generateRandomUpdate() {
+		return RANDOM.nextInt(100);
+	}
+
+	@Override
+	Integer getInternal(@Nonnull ReducingState<Integer> state) throws Exception {
+		return state.get();
+	}
+
+	@Override
+	void updateInternal(@Nonnull ReducingState<Integer> state, Integer update) throws Exception {
+		state.add(update);
+	}
+
+	@Override
+	Integer expected(@Nonnull List<ValueWithTs<Integer>> updates, long currentTimestamp) {
+		if (updates.isEmpty()) {
+			return null;
+		}
+		int acc = 0;
+		long lastTs = updates.get(0).getTimestampAfterUpdate();
+		for (ValueWithTs<Integer> update : updates) {
+			if (expired(lastTs, update.getTimestampAfterUpdate())) {
+				acc = 0;
+			}
+			acc += update.getValue();
+			lastTs = update.getTimestampAfterUpdate();
+		}
+		return expired(lastTs, currentTimestamp) ? null : acc;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/01cf808e/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlStateVerifier.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlStateVerifier.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlStateVerifier.java
new file mode 100644
index 0000000..e1c2e07
--- /dev/null
+++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlStateVerifier.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.verify;
+
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateTtlConfiguration;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+
+import javax.annotation.Nonnull;
+
+import java.util.Arrays;
+import java.util.List;
+
+/** TTL state verifier interface. */
+public interface TtlStateVerifier<UV, GV> {
+	List<TtlStateVerifier<?, ?>> VERIFIERS = Arrays.asList(
+		new TtlValueStateVerifier(),
+		new TtlListStateVerifier(),
+		new TtlMapStateVerifier(),
+		new TtlAggregatingStateVerifier(),
+		new TtlReducingStateVerifier(),
+		new TtlFoldingStateVerifier()
+	);
+
+	@Nonnull
+	default String getId() {
+		return this.getClass().getSimpleName();
+	}
+
+	@Nonnull
+	State createState(@Nonnull FunctionInitializationContext context, @Nonnull StateTtlConfiguration ttlConfig);
+
+	@Nonnull
+	TypeSerializer<UV> getUpdateSerializer();
+
+	UV generateRandomUpdate();
+
+	GV get(@Nonnull State state) throws Exception;
+
+	void update(@Nonnull State state, Object update) throws Exception;
+
+	boolean verify(@Nonnull TtlVerificationContext<?, ?> verificationContext);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/01cf808e/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlUpdateContext.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlUpdateContext.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlUpdateContext.java
new file mode 100644
index 0000000..959340b
--- /dev/null
+++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlUpdateContext.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.verify;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+
+/** Contains context relevant for state update with TTL. */
+public class TtlUpdateContext<UV, GV> implements Serializable {
+	private final long timestampBeforeUpdate;
+	private final GV valueBeforeUpdate;
+	private final UV update;
+	private final GV updatedValue;
+	private final long timestampAfterUpdate;
+
+	public TtlUpdateContext(
+		long timestampBeforeUpdate,
+		GV valueBeforeUpdate, UV update, GV updatedValue,
+		long timestampAfterUpdate) {
+		this.valueBeforeUpdate = valueBeforeUpdate;
+		this.update = update;
+		this.updatedValue = updatedValue;
+		this.timestampBeforeUpdate = timestampBeforeUpdate;
+		this.timestampAfterUpdate = timestampAfterUpdate;
+	}
+
+	long getTimestampBeforeUpdate() {
+		return timestampBeforeUpdate;
+	}
+
+	GV getValueBeforeUpdate() {
+		return valueBeforeUpdate;
+	}
+
+	@Nonnull
+	public ValueWithTs<UV> getUpdateWithTs() {
+		return new ValueWithTs<>(update, timestampBeforeUpdate, timestampAfterUpdate);
+	}
+
+	GV getUpdatedValue() {
+		return updatedValue;
+	}
+
+	@Override
+	public String toString() {
+		return "TtlUpdateContext{" +
+			"timestampBeforeUpdate=" + timestampBeforeUpdate +
+			", valueBeforeUpdate=" + valueBeforeUpdate +
+			", update=" + update +
+			", updatedValue=" + updatedValue +
+			", timestampAfterUpdate=" + timestampAfterUpdate +
+			'}';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/01cf808e/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlValueStateVerifier.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlValueStateVerifier.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlValueStateVerifier.java
new file mode 100644
index 0000000..fa4929b
--- /dev/null
+++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlValueStateVerifier.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.verify;
+
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+
+import javax.annotation.Nonnull;
+
+import java.util.List;
+
+class TtlValueStateVerifier
+	extends AbstractTtlStateVerifier<ValueStateDescriptor<String>, ValueState<String>, String, String, String> {
+	TtlValueStateVerifier() {
+		super(new ValueStateDescriptor<>("TtlValueStateVerifier", StringSerializer.INSTANCE));
+	}
+
+	@Override
+	@Nonnull
+	State createState(FunctionInitializationContext context) {
+		return context.getKeyedStateStore().getState(stateDesc);
+	}
+
+	@Nonnull
+	public String generateRandomUpdate() {
+		return randomString();
+	}
+
+	@Override
+	String getInternal(@Nonnull ValueState<String> state) throws Exception {
+		return state.value();
+	}
+
+	@Override
+	void updateInternal(@Nonnull ValueState<String> state, String update) throws Exception {
+		state.update(update);
+	}
+
+	@Override
+	String expected(@Nonnull List<ValueWithTs<String>> updates, long currentTimestamp) {
+		if (updates.isEmpty()) {
+			return null;
+		}
+		ValueWithTs<String> lastUpdate = updates.get(updates.size() - 1);
+		return expired(lastUpdate.getTimestampAfterUpdate(), currentTimestamp) ? null : lastUpdate.getValue();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/01cf808e/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlVerificationContext.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlVerificationContext.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlVerificationContext.java
new file mode 100644
index 0000000..4c985cd
--- /dev/null
+++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlVerificationContext.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.verify;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/** Data to verify state update with TTL. */
+public class TtlVerificationContext<UV, GV> implements Serializable {
+	private final int key;
+	@Nonnull
+	private final String  verifierId;
+	@Nonnull
+	private final List<ValueWithTs<UV>> prevUpdates;
+	@Nonnull
+	private final TtlUpdateContext<UV, GV> updateContext;
+
+	@SuppressWarnings("unchecked")
+	public TtlVerificationContext(
+		int key,
+		@Nonnull String verifierId,
+		@Nonnull List<ValueWithTs<?>> prevUpdates,
+		@Nonnull TtlUpdateContext<?, ?> updateContext) {
+		this.key = key;
+		this.verifierId = verifierId;
+		this.prevUpdates = new ArrayList<>();
+		prevUpdates.forEach(pu -> this.prevUpdates.add((ValueWithTs<UV>) pu));
+		this.updateContext = (TtlUpdateContext<UV, GV>) updateContext;
+	}
+
+	@Nonnull
+	List<ValueWithTs<UV>> getPrevUpdates() {
+		return prevUpdates;
+	}
+
+	@Nonnull
+	TtlUpdateContext<UV, GV> getUpdateContext() {
+		return updateContext;
+	}
+
+	@Override
+	public String toString() {
+		return "TtlVerificationContext{" +
+			"key=" + key +
+			", verifierId='" + verifierId + '\'' +
+			", prevUpdates=" + prevUpdates +
+			", updateContext=" + updateContext +
+			'}';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/01cf808e/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java
new file mode 100644
index 0000000..9302377
--- /dev/null
+++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.verify;
+
+import org.apache.flink.api.common.typeutils.CompositeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+
+/** User state value with timestamps before and after update. */
+public class ValueWithTs<V> implements Serializable {
+	private final V value;
+	private final long timestampBeforeUpdate;
+	private final long timestampAfterUpdate;
+
+	public ValueWithTs(V value, long timestampBeforeUpdate, long timestampAfterUpdate) {
+		this.value = value;
+		this.timestampBeforeUpdate = timestampBeforeUpdate;
+		this.timestampAfterUpdate = timestampAfterUpdate;
+	}
+
+	V getValue() {
+		return value;
+	}
+
+	public long getTimestampBeforeUpdate() {
+		return timestampBeforeUpdate;
+	}
+
+	public long getTimestampAfterUpdate() {
+		return timestampAfterUpdate;
+	}
+
+	@Override
+	public String toString() {
+		return "ValueWithTs{" +
+			"value=" + value +
+			", timestampBeforeUpdate=" + timestampBeforeUpdate +
+			", timestampAfterUpdate=" + timestampAfterUpdate +
+			'}';
+	}
+
+	/** Serializer for Serializer. */
+	public static class Serializer extends CompositeSerializer<ValueWithTs<?>> {
+
+		public Serializer(TypeSerializer<?> userValueSerializer) {
+			super(true, userValueSerializer, LongSerializer.INSTANCE, LongSerializer.INSTANCE);
+		}
+
+		@SuppressWarnings("unchecked")
+		Serializer(PrecomputedParameters precomputed, TypeSerializer<?>... fieldSerializers) {
+			super(precomputed, fieldSerializers);
+		}
+
+		@Override
+		public ValueWithTs<?> createInstance(@Nonnull Object ... values) {
+			return new ValueWithTs<>(values[0], (Long) values[1], (Long) values[2]);
+		}
+
+		@Override
+		protected void setField(@Nonnull ValueWithTs<?> value, int index, Object fieldValue) {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		protected Object getField(@Nonnull ValueWithTs<?> value, int index) {
+			switch (index) {
+				case 0:
+					return value.getValue();
+				case 1:
+					return value.getTimestampBeforeUpdate();
+				case 2:
+					return value.getTimestampAfterUpdate();
+				default:
+					throw new FlinkRuntimeException("Unexpected field index for ValueWithTs");
+			}
+		}
+
+		@SuppressWarnings("unchecked")
+		@Override
+		protected CompositeSerializer<ValueWithTs<?>> createSerializerInstance(
+			PrecomputedParameters precomputed,
+			TypeSerializer<?>... originalSerializers) {
+			return new Serializer(precomputed, (TypeSerializer<Object>) originalSerializers[0]);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/01cf808e/flink-end-to-end-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index 86dd3db..4abf595 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -50,6 +50,7 @@ under the License.
 		<module>flink-elasticsearch5-test</module>
 		<module>flink-quickstart-test</module>
 		<module>flink-confluent-schema-registry</module>
+		<module>flink-stream-state-ttl-test</module>
 	</modules>
 
 	<build>

http://git-wip-us.apache.org/repos/asf/flink/blob/01cf808e/flink-end-to-end-tests/run-nightly-tests.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index 15c73d5..dc8424f 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -102,5 +102,8 @@ run_test "Quickstarts Scala nightly end-to-end test" "$END_TO_END_DIR/test-scrip
 
 run_test "Avro Confluent Schema Registry nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_confluent_schema_registry.sh"
 
+run_test "State TTL Heap backend end-to-end test" "$END_TO_END_DIR/test-scripts/test_stream_state_ttl.sh file"
+run_test "State TTL RocksDb backend end-to-end test" "$END_TO_END_DIR/test-scripts/test_stream_state_ttl.sh rocks"
+
 printf "\n[PASS] All tests passed\n"
 exit 0

http://git-wip-us.apache.org/repos/asf/flink/blob/01cf808e/flink-end-to-end-tests/test-scripts/common.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh
index c78afe7..f4563cc 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -240,6 +240,15 @@ function start_cluster {
   done
 }
 
+function start_taskmanagers {
+    tmnum=$1
+    echo "Start ${tmnum} more task managers"
+    for (( c=0; c<tmnum; c++ ))
+    do
+        $FLINK_DIR/bin/taskmanager.sh start
+    done
+}
+
 function start_and_wait_for_tm {
 
   tm_query_result=$(curl -s "http://localhost:8081/taskmanagers")
@@ -456,18 +465,17 @@ function s3_delete {
     https://${bucket}.s3.amazonaws.com/${s3_file}
 }
 
-# This function starts the given number of task managers and monitors their processes. If a task manager process goes
-# away a replacement is started.
+# This function starts the given number of task managers and monitors their processes.
+# If a task manager process goes away a replacement is started.
 function tm_watchdog {
   local expectedTm=$1
   while true;
   do
     runningTm=`jps | grep -Eo 'TaskManagerRunner|TaskManager' | wc -l`;
     count=$((expectedTm-runningTm))
-    for (( c=0; c<count; c++ ))
-    do
-      $FLINK_DIR/bin/taskmanager.sh start > /dev/null
-    done
+    if (( count != 0 )); then
+        start_taskmanagers ${count} > /dev/null
+    fi
     sleep 5;
   done
 }
@@ -508,7 +516,8 @@ function rollback_flink_slf4j_metric_reporter() {
 
 function get_metric_processed_records {
   OPERATOR=$1
-  N=$(grep ".General purpose test job.$OPERATOR.numRecordsIn:" $FLINK_DIR/log/*taskexecutor*.log | sed 's/.* //g' | tail -1)
+  JOB_NAME="${2:-General purpose test job}"
+  N=$(grep ".${JOB_NAME}.$OPERATOR.numRecordsIn:" $FLINK_DIR/log/*taskexecutor*.log | sed 's/.* //g' | tail -1)
   if [ -z $N ]; then
     N=0
   fi
@@ -517,7 +526,8 @@ function get_metric_processed_records {
 
 function get_num_metric_samples {
   OPERATOR=$1
-  N=$(grep ".General purpose test job.$OPERATOR.numRecordsIn:" $FLINK_DIR/log/*taskexecutor*.log | wc -l)
+  JOB_NAME="${2:-General purpose test job}"
+  N=$(grep ".${JOB_NAME}.$OPERATOR.numRecordsIn:" $FLINK_DIR/log/*taskexecutor*.log | wc -l)
   if [ -z $N ]; then
     N=0
   fi
@@ -527,13 +537,14 @@ function get_num_metric_samples {
 function wait_oper_metric_num_in_records {
     OPERATOR=$1
     MAX_NUM_METRICS="${2:-200}"
-    NUM_METRICS=$(get_num_metric_samples ${OPERATOR})
-    OLD_NUM_METRICS=${3:-${NUM_METRICS}}
+    JOB_NAME="${3:-General purpose test job}"
+    NUM_METRICS=$(get_num_metric_samples ${OPERATOR} '${JOB_NAME}')
+    OLD_NUM_METRICS=${4:-${NUM_METRICS}}
     # monitor the numRecordsIn metric of the state machine operator in the second execution
     # we let the test finish once the second restore execution has processed 200 records
     while : ; do
-      NUM_METRICS=$(get_num_metric_samples ${OPERATOR})
-      NUM_RECORDS=$(get_metric_processed_records ${OPERATOR})
+      NUM_METRICS=$(get_num_metric_samples ${OPERATOR} "${JOB_NAME}")
+      NUM_RECORDS=$(get_metric_processed_records ${OPERATOR} "${JOB_NAME}")
 
       # only account for metrics that appeared in the second execution
       if (( $OLD_NUM_METRICS >= $NUM_METRICS )) ; then
@@ -541,7 +552,7 @@ function wait_oper_metric_num_in_records {
       fi
 
       if (( $NUM_RECORDS < $MAX_NUM_METRICS )); then
-        echo "Waiting for job to process up to 200 records, current progress: $NUM_RECORDS records ..."
+        echo "Waiting for job to process up to ${MAX_NUM_METRICS} records, current progress: ${NUM_RECORDS} records ..."
         sleep 1
       else
         break

http://git-wip-us.apache.org/repos/asf/flink/blob/01cf808e/flink-end-to-end-tests/test-scripts/test_stream_state_ttl.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_stream_state_ttl.sh b/flink-end-to-end-tests/test-scripts/test_stream_state_ttl.sh
new file mode 100755
index 0000000..fb911f3
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_stream_state_ttl.sh
@@ -0,0 +1,83 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+
+STATE_BACKEND_TYPE="${1:-file}"
+STATE_BACKEND_FILE_ASYNC="${2:-false}"
+TTL="${3:-1000}"
+PRECISION="${4:-5}"
+PARALLELISM="${5-3}"
+UPDATE_NUM="${6-1000}"
+
+CHECKPOINT_DIR="file://$TEST_DATA_DIR/savepoint-e2e-test-chckpt-dir"
+
+TEST=flink-stream-state-ttl-test
+TEST_PROGRAM_NAME=DataStreamStateTTLTestProgram
+TEST_PROGRAM_JAR=${END_TO_END_DIR}/$TEST/target/$TEST_PROGRAM_NAME.jar
+
+setup_flink_slf4j_metric_reporter
+function test_cleanup {
+  # don't call ourselves again for another signal interruption
+  trap "exit -1" INT
+  # don't call ourselves again for normal exit
+  trap "" EXIT
+
+  # revert our modifications to the Flink distribution
+  rm ${FLINK_DIR}/lib/flink-metrics-slf4j-*.jar
+}
+trap test_cleanup INT
+trap test_cleanup EXIT
+
+start_cluster
+start_taskmanagers $PARALLELISM
+
+function job_id() {
+    CMD="${FLINK_DIR}/bin/flink run -d -p ${PARALLELISM} ${TEST_PROGRAM_JAR} \
+      --test.semantics exactly-once \
+      --environment.parallelism ${PARALLELISM} \
+      --state_backend ${STATE_BACKEND_TYPE} \
+      --state_ttl_verifier.ttl_milli ${TTL} \
+      --state_ttl_verifier.precision_milli ${PRECISION} \
+      --state_backend.checkpoint_directory ${CHECKPOINT_DIR} \
+      --state_backend.file.async ${STATE_BACKEND_FILE_ASYNC} \
+      --update_generator_source.sleep_time 10 \
+      --update_generator_source.sleep_after_elements 1"
+    echo "${CMD}"
+}
+
+JOB_CMD=$(job_id)
+echo ${JOB_CMD}
+JOB=$(${JOB_CMD} | grep 'Job has been submitted with JobID' | sed 's/.* //g')
+wait_job_running ${JOB}
+wait_oper_metric_num_in_records TtlVerifyUpdateFunction.0 ${UPDATE_NUM} 'State TTL test job'
+
+SAVEPOINT_PATH=$(take_savepoint ${JOB} ${TEST_DATA_DIR} \
+  | grep "Savepoint completed. Path:" | sed 's/.* //g')
+
+cancel_job ${JOB}
+
+JOB_CMD=$(job_id)
+echo ${JOB_CMD}
+JOB=$(${JOB_CMD} | grep 'Job has been submitted with JobID' | sed 's/.* //g')
+wait_job_running ${JOB}
+wait_oper_metric_num_in_records TtlVerifyUpdateFunction.0 ${UPDATE_NUM} "State TTL test job"
+
+# if verification fails job produces failed TTL'ed state updates,
+# output would be non-empty and the test will not pass