You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/06/29 12:20:23 UTC

[2/2] flink git commit: [FLINK-9514, FLINK-9515, FLINK-9516] Introduce wrapper to enhance state with ttl

[FLINK-9514,FLINK-9515,FLINK-9516] Introduce wrapper to enhance state with ttl

This closes #6186.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2c13e00c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2c13e00c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2c13e00c

Branch: refs/heads/master
Commit: 2c13e00c1c108163466f9944de2a32dd5a1b7352
Parents: 92a6a22
Author: Andrey Zagrebin <az...@gmail.com>
Authored: Mon Jun 4 17:28:40 2018 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Fri Jun 29 14:18:59 2018 +0200

----------------------------------------------------------------------
 .../runtime/state/ttl/AbstractTtlDecorator.java | 110 ++++++++++++
 .../runtime/state/ttl/AbstractTtlState.java     |  85 +++++++++
 .../runtime/state/ttl/TtlAggregateFunction.java |  83 +++++++++
 .../runtime/state/ttl/TtlAggregatingState.java  |  80 +++++++++
 .../flink/runtime/state/ttl/TtlConfig.java      |  96 +++++++++++
 .../runtime/state/ttl/TtlFoldFunction.java      |  49 ++++++
 .../runtime/state/ttl/TtlFoldingState.java      |  69 ++++++++
 .../flink/runtime/state/ttl/TtlListState.java   | 172 +++++++++++++++++++
 .../flink/runtime/state/ttl/TtlMapState.java    | 134 +++++++++++++++
 .../runtime/state/ttl/TtlReduceFunction.java    |  53 ++++++
 .../runtime/state/ttl/TtlReducingState.java     |  73 ++++++++
 .../runtime/state/ttl/TtlTimeProvider.java      |  26 +++
 .../flink/runtime/state/ttl/TtlValue.java       |  47 +++++
 .../flink/runtime/state/ttl/TtlValueState.java  |  53 ++++++
 .../runtime/state/ttl/MockInternalKvState.java  |  96 +++++++++++
 .../runtime/state/ttl/MockInternalMapState.java |  88 ++++++++++
 .../state/ttl/MockInternalMergingState.java     |  52 ++++++
 .../runtime/state/ttl/MockTimeProvider.java     |  28 +++
 .../state/ttl/TtlAggregatingStateTest.java      | 115 +++++++++++++
 .../runtime/state/ttl/TtlFoldingStateTest.java  |  71 ++++++++
 .../runtime/state/ttl/TtlListStateTest.java     | 112 ++++++++++++
 .../state/ttl/TtlMapStatePerElementTest.java    |  47 +++++
 .../runtime/state/ttl/TtlMapStateTest.java      |  60 +++++++
 .../runtime/state/ttl/TtlMergingStateBase.java  | 126 ++++++++++++++
 .../runtime/state/ttl/TtlReducingStateTest.java |  94 ++++++++++
 .../runtime/state/ttl/TtlStateTestBase.java     | 162 +++++++++++++++++
 .../runtime/state/ttl/TtlValueStateTest.java    |  62 +++++++
 27 files changed, 2243 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2c13e00c/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
new file mode 100644
index 0000000..12efc00
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Base class for TTL logic wrappers.
+ *
+ * @param <T> Type of originally wrapped object
+ */
+abstract class AbstractTtlDecorator<T> {
+	/** Wrapped original state handler. */
+	final T original;
+
+	final TtlConfig config;
+
+	final TtlTimeProvider timeProvider;
+
+	/** Whether to renew expiration timestamp on state read access. */
+	final boolean updateTsOnRead;
+
+	/** Whether to renew expiration timestamp on state read access. */
+	final boolean returnExpired;
+
+	/** State value time to live in milliseconds. */
+	final long ttl;
+
+	AbstractTtlDecorator(
+		T original,
+		TtlConfig config,
+		TtlTimeProvider timeProvider) {
+		Preconditions.checkNotNull(original);
+		Preconditions.checkNotNull(config);
+		Preconditions.checkNotNull(timeProvider);
+		Preconditions.checkArgument(config.getTtlUpdateType() != TtlConfig.TtlUpdateType.Disabled,
+			"State does not need to be wrapped with TTL if it is configured as disabled.");
+		this.original = original;
+		this.config = config;
+		this.timeProvider = timeProvider;
+		this.updateTsOnRead = config.getTtlUpdateType() == TtlConfig.TtlUpdateType.OnReadAndWrite;
+		this.returnExpired = config.getStateVisibility() == TtlConfig.TtlStateVisibility.ReturnExpiredIfNotCleanedUp;
+		this.ttl = config.getTtl().toMilliseconds();
+	}
+
+	<V> V getUnexpired(TtlValue<V> ttlValue) {
+		return ttlValue == null || (expired(ttlValue) && !returnExpired) ? null : ttlValue.getUserValue();
+	}
+
+	<V> boolean expired(TtlValue<V> ttlValue) {
+		return ttlValue != null && getExpirationTimestamp(ttlValue) <= timeProvider.currentTimestamp();
+	}
+
+	private long getExpirationTimestamp(@Nonnull TtlValue<?> ttlValue) {
+		long ts = ttlValue.getLastAccessTimestamp();
+		long ttlWithoutOverflow = ts > 0 ? Math.min(Long.MAX_VALUE - ts, ttl) : ttl;
+		return ts + ttlWithoutOverflow;
+	}
+
+	<V> TtlValue<V> wrapWithTs(V value) {
+		return wrapWithTs(value, timeProvider.currentTimestamp());
+	}
+
+	static <V> TtlValue<V> wrapWithTs(V value, long ts) {
+		return value == null ? null : new TtlValue<>(value, ts);
+	}
+
+	<V> TtlValue<V> rewrapWithNewTs(TtlValue<V> ttlValue) {
+		return wrapWithTs(ttlValue.getUserValue());
+	}
+
+	<SE extends Throwable, CE extends Throwable, CLE extends Throwable, V> V getWithTtlCheckAndUpdate(
+		SupplierWithException<TtlValue<V>, SE> getter,
+		ThrowingConsumer<TtlValue<V>, CE> updater,
+		ThrowingRunnable<CLE> stateClear) throws SE, CE, CLE {
+		TtlValue<V> ttlValue = getter.get();
+		if (ttlValue == null) {
+			return null;
+		} else if (expired(ttlValue)) {
+			stateClear.run();
+			if (!returnExpired) {
+				return null;
+			}
+		} else if (updateTsOnRead) {
+			updater.accept(rewrapWithNewTs(ttlValue));
+		}
+		return ttlValue.getUserValue();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c13e00c/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlState.java
new file mode 100644
index 0000000..ddab8f4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlState.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+/**
+ * Base class for TTL logic wrappers of state objects.
+ *
+ * @param <K> The type of key the state is associated to
+ * @param <N> The type of the namespace
+ * @param <SV> The type of values kept internally in state without TTL
+ * @param <TTLSV> The type of values kept internally in state with TTL
+ * @param <S> Type of originally wrapped state object
+ */
+abstract class AbstractTtlState<K, N, SV, TTLSV, S extends InternalKvState<K, N, TTLSV>>
+	extends AbstractTtlDecorator<S>
+	implements InternalKvState<K, N, SV> {
+	private final TypeSerializer<SV> valueSerializer;
+
+	AbstractTtlState(S original, TtlConfig config, TtlTimeProvider timeProvider, TypeSerializer<SV> valueSerializer) {
+		super(original, config, timeProvider);
+		this.valueSerializer = valueSerializer;
+	}
+
+	<SE extends Throwable, CE extends Throwable, T> T getWithTtlCheckAndUpdate(
+		SupplierWithException<TtlValue<T>, SE> getter,
+		ThrowingConsumer<TtlValue<T>, CE> updater) throws SE, CE {
+		return getWithTtlCheckAndUpdate(getter, updater, original::clear);
+	}
+
+	@Override
+	public TypeSerializer<K> getKeySerializer() {
+		return original.getKeySerializer();
+	}
+
+	@Override
+	public TypeSerializer<N> getNamespaceSerializer() {
+		return original.getNamespaceSerializer();
+	}
+
+	@Override
+	public TypeSerializer<SV> getValueSerializer() {
+		return valueSerializer;
+	}
+
+	@Override
+	public void setCurrentNamespace(N namespace) {
+		original.setCurrentNamespace(namespace);
+	}
+
+	@Override
+	public byte[] getSerializedValue(
+		byte[] serializedKeyAndNamespace,
+		TypeSerializer<K> safeKeySerializer,
+		TypeSerializer<N> safeNamespaceSerializer,
+		TypeSerializer<SV> safeValueSerializer) {
+		throw new FlinkRuntimeException("Queryable state is not currently supported with TTL.");
+	}
+
+	@Override
+	public void clear() {
+		original.clear();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c13e00c/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAggregateFunction.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAggregateFunction.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAggregateFunction.java
new file mode 100644
index 0000000..7d35ef3
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAggregateFunction.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+/**
+ * This class wraps aggregating function with TTL logic.
+ *
+ * @param <IN>  The type of the values that are aggregated (input values)
+ * @param <ACC> The type of the accumulator (intermediate aggregate state).
+ * @param <OUT> The type of the aggregated result
+ */
+class TtlAggregateFunction<IN, ACC, OUT>
+	extends AbstractTtlDecorator<AggregateFunction<IN, ACC, OUT>>
+	implements AggregateFunction<IN, TtlValue<ACC>, OUT> {
+	ThrowingRunnable<Exception> stateClear;
+	ThrowingConsumer<TtlValue<ACC>, Exception> updater;
+
+	TtlAggregateFunction(AggregateFunction<IN, ACC, OUT> aggFunction, TtlConfig config, TtlTimeProvider timeProvider) {
+		super(aggFunction, config, timeProvider);
+	}
+
+	@Override
+	public TtlValue<ACC> createAccumulator() {
+		return wrapWithTs(original.createAccumulator());
+	}
+
+	@Override
+	public TtlValue<ACC> add(IN value, TtlValue<ACC> accumulator) {
+		ACC userAcc = getUnexpired(accumulator);
+		userAcc = userAcc == null ? original.createAccumulator() : userAcc;
+		return wrapWithTs(original.add(value, userAcc));
+	}
+
+	@Override
+	public OUT getResult(TtlValue<ACC> accumulator) {
+		Preconditions.checkNotNull(updater, "State updater should be set in TtlAggregatingState");
+		Preconditions.checkNotNull(stateClear, "State clearing should be set in TtlAggregatingState");
+		ACC userAcc;
+		try {
+			userAcc = getWithTtlCheckAndUpdate(() -> accumulator, updater, stateClear);
+		} catch (Exception e) {
+			throw new FlinkRuntimeException("Failed to retrieve original internal aggregating state", e);
+		}
+		return userAcc == null ? null : original.getResult(userAcc);
+	}
+
+	@Override
+	public TtlValue<ACC> merge(TtlValue<ACC> a, TtlValue<ACC> b) {
+		ACC userA = getUnexpired(a);
+		ACC userB = getUnexpired(b);
+		if (userA != null && userB != null) {
+			return wrapWithTs(original.merge(userA, userB));
+		} else if (userA != null) {
+			return rewrapWithNewTs(a);
+		} else if (userB != null) {
+			return rewrapWithNewTs(b);
+		} else {
+			return null;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c13e00c/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAggregatingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAggregatingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAggregatingState.java
new file mode 100644
index 0000000..097e49b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAggregatingState.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalAggregatingState;
+
+import java.util.Collection;
+
+/**
+ * This class wraps aggregating state with TTL logic.
+ *
+ * @param <K> The type of key the state is associated to
+ * @param <N> The type of the namespace
+ * @param <IN> Type of the value added to the state
+ * @param <ACC> The type of the accumulator (intermediate aggregate state).
+ * @param <OUT> Type of the value extracted from the state
+ *
+ */
+class TtlAggregatingState<K, N, IN, ACC, OUT>
+	extends AbstractTtlState<K, N, ACC, TtlValue<ACC>, InternalAggregatingState<K, N, IN, TtlValue<ACC>, OUT>>
+	implements InternalAggregatingState<K, N, IN, ACC, OUT> {
+
+	TtlAggregatingState(
+		InternalAggregatingState<K, N, IN, TtlValue<ACC>, OUT> originalState,
+		TtlConfig config,
+		TtlTimeProvider timeProvider,
+		TypeSerializer<ACC> valueSerializer,
+		TtlAggregateFunction<IN, ACC, OUT> aggregateFunction) {
+		super(originalState, config, timeProvider, valueSerializer);
+		aggregateFunction.stateClear = originalState::clear;
+		aggregateFunction.updater = originalState::updateInternal;
+	}
+
+	@Override
+	public OUT get() throws Exception {
+		return original.get();
+	}
+
+	@Override
+	public void add(IN value) throws Exception {
+		original.add(value);
+	}
+
+	@Override
+	public void clear() {
+		original.clear();
+	}
+
+	@Override
+	public ACC getInternal() throws Exception {
+		return getWithTtlCheckAndUpdate(original::getInternal, original::updateInternal);
+	}
+
+	@Override
+	public void updateInternal(ACC valueToStore) throws Exception {
+		original.updateInternal(wrapWithTs(valueToStore));
+	}
+
+	@Override
+	public void mergeNamespaces(N target, Collection<N> sources) throws Exception {
+		original.mergeNamespaces(target, sources);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c13e00c/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlConfig.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlConfig.java
new file mode 100644
index 0000000..17cac49
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlConfig.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Configuration of state TTL logic.
+ * TODO: builder
+ */
+public class TtlConfig {
+	/**
+	 * This option value configures when to update last access timestamp which prolongs state TTL.
+	 */
+	public enum TtlUpdateType {
+		/** TTL is disabled. State does not expire. */
+		Disabled,
+		/** Last access timestamp is initialised when state is created and updated on every write operation. */
+		OnCreateAndWrite,
+		/** The same as <code>OnCreateAndWrite</code> but also updated on read. */
+		OnReadAndWrite
+	}
+
+	/**
+	 * This option configures whether expired user value can be returned or not.
+	 */
+	public enum TtlStateVisibility {
+		/** Return expired user value if it is not cleaned up yet. */
+		ReturnExpiredIfNotCleanedUp,
+		/** Never return expired user value. */
+		NeverReturnExpired
+	}
+
+	/**
+	 * This option configures time scale to use for ttl.
+	 */
+	public enum TtlTimeCharacteristic {
+		/** Processing time, see also <code>TimeCharacteristic.ProcessingTime</code>. */
+		ProcessingTime
+	}
+
+	private final TtlUpdateType ttlUpdateType;
+	private final TtlStateVisibility stateVisibility;
+	private final TtlTimeCharacteristic timeCharacteristic;
+	private final Time ttl;
+
+	public TtlConfig(
+		TtlUpdateType ttlUpdateType,
+		TtlStateVisibility stateVisibility,
+		TtlTimeCharacteristic timeCharacteristic,
+		Time ttl) {
+		Preconditions.checkNotNull(ttlUpdateType);
+		Preconditions.checkNotNull(stateVisibility);
+		Preconditions.checkNotNull(timeCharacteristic);
+		Preconditions.checkNotNull(ttl);
+		Preconditions.checkArgument(ttl.toMilliseconds() > 0,
+			"TTL is expected to be positive");
+		this.ttlUpdateType = ttlUpdateType;
+		this.stateVisibility = stateVisibility;
+		this.timeCharacteristic = timeCharacteristic;
+		this.ttl = ttl;
+	}
+
+	public TtlUpdateType getTtlUpdateType() {
+		return ttlUpdateType;
+	}
+
+	public TtlStateVisibility getStateVisibility() {
+		return stateVisibility;
+	}
+
+	public Time getTtl() {
+		return ttl;
+	}
+
+	public TtlTimeCharacteristic getTimeCharacteristic() {
+		return timeCharacteristic;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c13e00c/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldFunction.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldFunction.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldFunction.java
new file mode 100644
index 0000000..8d54d87
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldFunction.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+
+/**
+ * This class wraps folding function with TTL logic.
+ *
+ * @param <T> Type of the values folded into the state
+ * @param <ACC> Type of the value in the state
+ *
+ * @deprecated use {@link TtlAggregateFunction} instead
+ */
+@Deprecated
+class TtlFoldFunction<T, ACC>
+	extends AbstractTtlDecorator<FoldFunction<T, ACC>>
+	implements FoldFunction<T, TtlValue<ACC>> {
+	private final ACC defaultAccumulator;
+
+	TtlFoldFunction(
+		FoldFunction<T, ACC> original, TtlConfig config, TtlTimeProvider timeProvider, ACC defaultAccumulator) {
+		super(original, config, timeProvider);
+		this.defaultAccumulator = defaultAccumulator;
+	}
+
+	@Override
+	public TtlValue<ACC> fold(TtlValue<ACC> accumulator, T value) throws Exception {
+		ACC userAcc = getUnexpired(accumulator);
+		userAcc = userAcc == null ? defaultAccumulator : userAcc;
+		return wrapWithTs(original.fold(userAcc, value));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c13e00c/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldingState.java
new file mode 100644
index 0000000..46f56d2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldingState.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.state.AggregatingState;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalFoldingState;
+
+/**
+ * This class wraps folding state with TTL logic.
+ *
+ * @param <T> Type of the values folded into the state
+ * @param <ACC> Type of the value in the state
+ *
+ * @deprecated use {@link AggregatingState} instead
+ */
+@Deprecated
+class TtlFoldingState<K, N, T, ACC>
+	extends AbstractTtlState<K, N, ACC, TtlValue<ACC>, InternalFoldingState<K, N, T, TtlValue<ACC>>>
+	implements InternalFoldingState<K, N, T, ACC> {
+	TtlFoldingState(
+		InternalFoldingState<K, N, T, TtlValue<ACC>> originalState,
+		TtlConfig config,
+		TtlTimeProvider timeProvider,
+		TypeSerializer<ACC> valueSerializer) {
+		super(originalState, config, timeProvider, valueSerializer);
+	}
+
+	@Override
+	public ACC get() throws Exception {
+		return getInternal();
+	}
+
+	@Override
+	public void add(T value) throws Exception {
+		original.add(value);
+	}
+
+	@Override
+	public void clear() {
+		original.clear();
+	}
+
+	@Override
+	public ACC getInternal() throws Exception {
+		return getWithTtlCheckAndUpdate(original::getInternal, original::updateInternal);
+	}
+
+	@Override
+	public void updateInternal(ACC valueToStore) throws Exception {
+		original.updateInternal(wrapWithTs(valueToStore));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c13e00c/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
new file mode 100644
index 0000000..6da0d8f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param <K> The type of key the state is associated to
+ * @param <N> The type of the namespace
+ * @param <T> Type of the user entry value of state with TTL
+ */
+class TtlListState<K, N, T> extends
+	AbstractTtlState<K, N, List<T>, List<TtlValue<T>>, InternalListState<K, N, TtlValue<T>>>
+	implements InternalListState<K, N, T> {
+	TtlListState(
+		InternalListState<K, N, TtlValue<T>> originalState,
+		TtlConfig config,
+		TtlTimeProvider timeProvider,
+		TypeSerializer<List<T>> valueSerializer) {
+		super(originalState, config, timeProvider, valueSerializer);
+	}
+
+	@Override
+	public void update(List<T> values) throws Exception {
+		updateInternal(values);
+	}
+
+	@Override
+	public void addAll(List<T> values) throws Exception {
+		Preconditions.checkNotNull(values, "List of values to add cannot be null.");
+		original.addAll(withTs(values));
+	}
+
+	@Override
+	public Iterable<T> get() throws Exception {
+		Iterable<TtlValue<T>> ttlValue = original.get();
+		ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue;
+		if (updateTsOnRead) {
+			List<TtlValue<T>> collected = collect(ttlValue);
+			ttlValue = collected;
+			updateTs(collected);
+		}
+		final Iterable<TtlValue<T>> finalResult = ttlValue;
+		return () -> new IteratorWithCleanup(finalResult.iterator());
+	}
+
+	private void updateTs(List<TtlValue<T>> ttlValue) throws Exception {
+		List<TtlValue<T>> unexpiredWithUpdatedTs = ttlValue.stream()
+			.filter(v -> !expired(v))
+			.map(this::rewrapWithNewTs)
+			.collect(Collectors.toList());
+		if (!unexpiredWithUpdatedTs.isEmpty()) {
+			original.update(unexpiredWithUpdatedTs);
+		}
+	}
+
+	@Override
+	public void add(T value) throws Exception {
+		Preconditions.checkNotNull(value, "You cannot add null to a ListState.");
+		original.add(wrapWithTs(value));
+	}
+
+	@Override
+	public void clear() {
+		original.clear();
+	}
+
+	@Override
+	public void mergeNamespaces(N target, Collection<N> sources) throws Exception {
+		original.mergeNamespaces(target, sources);
+	}
+
+	@Override
+	public List<T> getInternal() throws Exception {
+		return collect(get());
+	}
+
+	private <E> List<E> collect(Iterable<E> iterable) {
+		return iterable instanceof List ? (List<E>) iterable :
+			StreamSupport.stream(iterable.spliterator(), false).collect(Collectors.toList());
+	}
+
+	@Override
+	public void updateInternal(List<T> valueToStore) throws Exception {
+		Preconditions.checkNotNull(valueToStore, "List of values to update cannot be null.");
+		original.updateInternal(withTs(valueToStore));
+	}
+
+	private List<TtlValue<T>> withTs(List<T> values) {
+		return values.stream().map(this::wrapWithTs).collect(Collectors.toList());
+	}
+
+	private class IteratorWithCleanup implements Iterator<T> {
+		private final Iterator<TtlValue<T>> originalIterator;
+		private boolean anyUnexpired = false;
+		private boolean uncleared = true;
+		private T nextUnexpired = null;
+
+		private IteratorWithCleanup(Iterator<TtlValue<T>> ttlIterator) {
+			this.originalIterator = ttlIterator;
+		}
+
+		@Override
+		public boolean hasNext() {
+			findNextUnexpired();
+			cleanupIfEmpty();
+			return nextUnexpired != null;
+		}
+
+		private void cleanupIfEmpty() {
+			boolean endOfIter = !originalIterator.hasNext() && nextUnexpired == null;
+			if (uncleared && !anyUnexpired && endOfIter) {
+				original.clear();
+				uncleared = false;
+			}
+		}
+
+		@Override
+		public T next() {
+			if (hasNext()) {
+				T result = nextUnexpired;
+				nextUnexpired = null;
+				return result;
+			}
+			throw new NoSuchElementException();
+		}
+
+		private void findNextUnexpired() {
+			while (nextUnexpired == null && originalIterator.hasNext()) {
+				TtlValue<T> ttlValue = originalIterator.next();
+				if (ttlValue == null) {
+					break;
+				}
+				boolean unexpired = !expired(ttlValue);
+				if (unexpired) {
+					anyUnexpired = true;
+				}
+				if (unexpired || returnExpired) {
+					nextUnexpired = ttlValue.getUserValue();
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c13e00c/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
new file mode 100644
index 0000000..352c6c9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.AbstractMap;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps map state with TTL logic.
+ *
+ * @param <K> The type of key the state is associated to
+ * @param <N> The type of the namespace
+ * @param <UK> Type of the user entry key of state with TTL
+ * @param <UV> Type of the user entry value of state with TTL
+ */
+class TtlMapState<K, N, UK, UV>
+	extends AbstractTtlState<K, N, Map<UK, UV>, Map<UK, TtlValue<UV>>, InternalMapState<K, N, UK, TtlValue<UV>>>
+	implements InternalMapState<K, N, UK, UV> {
+	TtlMapState(
+		InternalMapState<K, N, UK, TtlValue<UV>> original,
+		TtlConfig config,
+		TtlTimeProvider timeProvider,
+		TypeSerializer<Map<UK, UV>> valueSerializer) {
+		super(original, config, timeProvider, valueSerializer);
+	}
+
+	@Override
+	public UV get(UK key) throws Exception {
+		return getWithTtlCheckAndUpdate(() -> original.get(key), v -> original.put(key, v), () -> original.remove(key));
+	}
+
+	@Override
+	public void put(UK key, UV value) throws Exception {
+		original.put(key, wrapWithTs(value));
+	}
+
+	@Override
+	public void putAll(Map<UK, UV> map) throws Exception {
+		if (map == null) {
+			return;
+		}
+		Map<UK, TtlValue<UV>> ttlMap = new HashMap<>(map.size());
+		for (UK key : map.keySet()) {
+			ttlMap.put(key, wrapWithTs(map.get(key)));
+		}
+		original.putAll(ttlMap);
+	}
+
+	@Override
+	public void remove(UK key) throws Exception {
+		original.remove(key);
+	}
+
+	@Override
+	public boolean contains(UK key) throws Exception {
+		return get(key) != null;
+	}
+
+	@Override
+	public Iterable<Map.Entry<UK, UV>> entries() throws Exception {
+		return entriesStream()::iterator;
+	}
+
+	private Stream<Map.Entry<UK, UV>> entriesStream() throws Exception {
+		Iterable<Map.Entry<UK, TtlValue<UV>>> withTs = original.entries();
+		withTs = withTs == null ? Collections.emptyList() : withTs;
+		return StreamSupport
+			.stream(withTs.spliterator(), false)
+			.filter(this::unexpiredAndUpdateOrCleanup)
+			.map(TtlMapState::unwrapWithoutTs);
+	}
+
+	private boolean unexpiredAndUpdateOrCleanup(Map.Entry<UK, TtlValue<UV>> e) {
+		UV unexpiredValue;
+		try {
+			unexpiredValue = getWithTtlCheckAndUpdate(
+				e::getValue,
+				v -> original.put(e.getKey(), v),
+				() -> original.remove(e.getKey()));
+		} catch (Exception ex) {
+			throw new FlinkRuntimeException(ex);
+		}
+		return unexpiredValue != null;
+	}
+
+	private static <UK, UV> Map.Entry<UK, UV> unwrapWithoutTs(Map.Entry<UK, TtlValue<UV>> e) {
+		return new AbstractMap.SimpleEntry<>(e.getKey(), e.getValue().getUserValue());
+	}
+
+	@Override
+	public Iterable<UK> keys() throws Exception {
+		return entriesStream().map(Map.Entry::getKey)::iterator;
+	}
+
+	@Override
+	public Iterable<UV> values() throws Exception {
+		return entriesStream().map(Map.Entry::getValue)::iterator;
+	}
+
+	@Override
+	public Iterator<Map.Entry<UK, UV>> iterator() throws Exception {
+		return entriesStream().iterator();
+	}
+
+	@Override
+	public void clear() {
+		original.clear();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c13e00c/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReduceFunction.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReduceFunction.java
new file mode 100644
index 0000000..de593c5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReduceFunction.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+
+/**
+ * This class wraps reducing function with TTL logic.
+ *
+ * @param <T> Type of the user value of state with TTL
+ */
+class TtlReduceFunction<T>
+	extends AbstractTtlDecorator<ReduceFunction<T>>
+	implements ReduceFunction<TtlValue<T>> {
+
+	TtlReduceFunction(
+		ReduceFunction<T> originalReduceFunction,
+		TtlConfig config,
+		TtlTimeProvider timeProvider) {
+		super(originalReduceFunction, config, timeProvider);
+	}
+
+	@Override
+	public TtlValue<T> reduce(TtlValue<T> value1, TtlValue<T> value2) throws Exception {
+		T userValue1 = getUnexpired(value1);
+		T userValue2 = getUnexpired(value2);
+		if (userValue1 != null && userValue2 != null) {
+			return wrapWithTs(original.reduce(userValue1, userValue2));
+		} else if (userValue1 != null) {
+			return rewrapWithNewTs(value1);
+		} else if (userValue2 != null) {
+			return rewrapWithNewTs(value2);
+		} else {
+			return null;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c13e00c/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReducingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReducingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReducingState.java
new file mode 100644
index 0000000..9169d45
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReducingState.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalReducingState;
+
+import java.util.Collection;
+
+/**
+ * This class wraps reducing state with TTL logic.
+ *
+ * @param <K> The type of key the state is associated to
+ * @param <N> The type of the namespace
+ * @param <T> Type of the user value of state with TTL
+ */
+class TtlReducingState<K, N, T>
+	extends AbstractTtlState<K, N, T, TtlValue<T>, InternalReducingState<K, N, TtlValue<T>>>
+	implements InternalReducingState<K, N, T> {
+	TtlReducingState(
+		InternalReducingState<K, N, TtlValue<T>> originalState,
+		TtlConfig config,
+		TtlTimeProvider timeProvider,
+		TypeSerializer<T> valueSerializer) {
+		super(originalState, config, timeProvider, valueSerializer);
+	}
+
+	@Override
+	public T get() throws Exception {
+		return getInternal();
+	}
+
+	@Override
+	public void add(T value) throws Exception {
+		original.add(wrapWithTs(value, Long.MAX_VALUE));
+	}
+
+	@Override
+	public void clear() {
+		original.clear();
+	}
+
+	@Override
+	public void mergeNamespaces(N target, Collection<N> sources) throws Exception {
+		original.mergeNamespaces(target, sources);
+	}
+
+	@Override
+	public T getInternal() throws Exception {
+		return getWithTtlCheckAndUpdate(original::getInternal, original::updateInternal);
+	}
+
+	@Override
+	public void updateInternal(T valueToStore) throws Exception {
+		original.updateInternal(wrapWithTs(valueToStore));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c13e00c/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlTimeProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlTimeProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlTimeProvider.java
new file mode 100644
index 0000000..bac9d36
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlTimeProvider.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+/**
+ * Provides time to TTL logic to judge about state expiration.
+ */
+interface TtlTimeProvider {
+	long currentTimestamp();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c13e00c/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java
new file mode 100644
index 0000000..99f8b0b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * This class wraps user value of state with TTL.
+ *
+ * @param <T> Type of the user value of state with TTL
+ */
+class TtlValue<T> implements Serializable {
+	private final T userValue;
+	private final long lastAccessTimestamp;
+
+	TtlValue(T userValue, long lastAccessTimestamp) {
+		Preconditions.checkNotNull(userValue);
+		this.userValue = userValue;
+		this.lastAccessTimestamp = lastAccessTimestamp;
+	}
+
+	T getUserValue() {
+		return userValue;
+	}
+
+	long getLastAccessTimestamp() {
+		return lastAccessTimestamp;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c13e00c/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValueState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValueState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValueState.java
new file mode 100644
index 0000000..9feb62d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValueState.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalValueState;
+
+import java.io.IOException;
+
+/**
+ * This class wraps value state with TTL logic.
+ *
+ * @param <K> The type of key the state is associated to
+ * @param <N> The type of the namespace
+ * @param <T> Type of the user value of state with TTL
+ */
+class TtlValueState<K, N, T>
+	extends AbstractTtlState<K, N, T, TtlValue<T>, InternalValueState<K, N, TtlValue<T>>>
+	implements InternalValueState<K, N, T> {
+	TtlValueState(
+		InternalValueState<K, N, TtlValue<T>> originalState,
+		TtlConfig config,
+		TtlTimeProvider timeProvider,
+		TypeSerializer<T> valueSerializer) {
+		super(originalState, config, timeProvider, valueSerializer);
+	}
+
+	@Override
+	public T value() throws IOException {
+		return getWithTtlCheckAndUpdate(original::value, original::update);
+	}
+
+	@Override
+	public void update(T value) throws IOException {
+		original.update(wrapWithTs(value));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c13e00c/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockInternalKvState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockInternalKvState.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockInternalKvState.java
new file mode 100644
index 0000000..878d888
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockInternalKvState.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Supplier;
+
+class MockInternalKvState<K, N, T> implements InternalKvState<K, N, T> {
+	private Map<N, T> namespacedValues = new HashMap<>();
+	private T defaultNamespaceValue;
+	private N currentNamespace;
+	private final Supplier<T> emptyValue;
+
+	MockInternalKvState() {
+		this(() -> null);
+	}
+
+	MockInternalKvState(Supplier<T> emptyValue) {
+		this.emptyValue = emptyValue;
+		defaultNamespaceValue = emptyValue.get();
+	}
+
+	@Override
+	public TypeSerializer<K> getKeySerializer() {
+		return null;
+	}
+
+	@Override
+	public TypeSerializer<N> getNamespaceSerializer() {
+		return null;
+	}
+
+	@Override
+	public TypeSerializer<T> getValueSerializer() {
+		return null;
+	}
+
+	@Override
+	public void setCurrentNamespace(N namespace) {
+		currentNamespace = namespace;
+	}
+
+	@Override
+	public byte[] getSerializedValue(
+		byte[] serializedKeyAndNamespace,
+		TypeSerializer safeKeySerializer,
+		TypeSerializer safeNamespaceSerializer,
+		TypeSerializer safeValueSerializer) {
+		return null;
+	}
+
+	@Override
+	public void clear() {
+		if (currentNamespace == null) {
+			defaultNamespaceValue = emptyValue.get();
+		} else {
+			namespacedValues.remove(currentNamespace);
+		}
+	}
+
+	public T getInternal() {
+		T value = currentNamespace == null ? defaultNamespaceValue :
+			namespacedValues.getOrDefault(currentNamespace, emptyValue.get());
+		updateInternal(value);
+		return value;
+	}
+
+	@SuppressWarnings("WeakerAccess")
+	public void updateInternal(T valueToStore) {
+		if (currentNamespace == null) {
+			defaultNamespaceValue = valueToStore;
+		} else {
+			namespacedValues.put(currentNamespace, valueToStore);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c13e00c/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockInternalMapState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockInternalMapState.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockInternalMapState.java
new file mode 100644
index 0000000..c548017
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockInternalMapState.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.runtime.state.internal.InternalMapState;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+class MockInternalMapState<K, N, UK, UV>
+	extends MockInternalKvState<K, N, Map<UK, UV>>
+	implements InternalMapState<K, N, UK, UV> {
+
+	MockInternalMapState() {
+		super(HashMap::new);
+	}
+
+	@Override
+	public void clear() {
+		getInternal().clear();
+	}
+
+	@Override
+	public UV get(UK key) {
+		return getInternal().get(key);
+	}
+
+	@Override
+	public void put(UK key, UV value) {
+		this.getInternal().put(key, value);
+	}
+
+	@Override
+	public void putAll(Map<UK, UV> map) {
+		getInternal().putAll(map);
+	}
+
+	@Override
+	public void remove(UK key) {
+		getInternal().remove(key);
+	}
+
+	@Override
+	public boolean contains(UK key) {
+		return getInternal().containsKey(key);
+	}
+
+	@Override
+	public Iterable<Map.Entry<UK, UV>> entries() {
+		return copy().entrySet();
+	}
+
+	private Map<UK, UV> copy() {
+		return new HashMap<>(getInternal());
+	}
+
+	@Override
+	public Iterable<UK> keys() {
+		return copy().keySet();
+	}
+
+	@Override
+	public Iterable<UV> values() {
+		return copy().values();
+	}
+
+	@Override
+	public Iterator<Map.Entry<UK, UV>> iterator() {
+		return entries().iterator();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c13e00c/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockInternalMergingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockInternalMergingState.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockInternalMergingState.java
new file mode 100644
index 0000000..582faf3
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockInternalMergingState.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.runtime.state.internal.InternalMergingState;
+
+import java.util.Collection;
+import java.util.function.Supplier;
+
+abstract class MockInternalMergingState<K, N, IN, ACC, OUT>
+	extends MockInternalKvState<K, N, ACC> implements InternalMergingState<K, N, IN, ACC, OUT> {
+
+	MockInternalMergingState() {
+		super();
+	}
+
+	MockInternalMergingState(Supplier<ACC> emptyValue) {
+		super(emptyValue);
+	}
+
+	@Override
+	public void mergeNamespaces(N target, Collection<N> sources) throws Exception {
+		ACC acc = null;
+		for (N n : sources) {
+			setCurrentNamespace(n);
+			ACC nAcc = getInternal();
+			acc = nAcc == null ? acc : (acc == null ? nAcc : mergeState(acc, nAcc));
+		}
+		if (acc != null) {
+			setCurrentNamespace(target);
+			updateInternal(acc);
+		}
+	}
+
+	abstract ACC mergeState(ACC acc, ACC nAcc) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c13e00c/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTimeProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTimeProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTimeProvider.java
new file mode 100644
index 0000000..e14c3f8
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTimeProvider.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+class MockTimeProvider implements TtlTimeProvider {
+	long time = 0;
+
+	@Override
+	public long currentTimestamp() {
+		return time;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c13e00c/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlAggregatingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlAggregatingStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlAggregatingStateTest.java
new file mode 100644
index 0000000..477f057
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlAggregatingStateTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.internal.InternalAggregatingState;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/** Test suite for {@link TtlAggregatingState}. */
+public class TtlAggregatingStateTest
+	extends TtlMergingStateBase.TtlIntegerMergingStateBase<TtlAggregatingState<?, String, Integer, Long, String>, Integer, String> {
+	private static final long DEFAULT_ACCUMULATOR = 3L;
+
+	@Override
+	TtlAggregatingState<?, String, Integer, Long, String> createState() {
+		TtlAggregateFunction<Integer, Long, String> ttlAggregateFunction =
+			new TtlAggregateFunction<>(AGGREGATE, ttlConfig, timeProvider);
+		return new TtlAggregatingState<>(
+			new MockInternalTtlAggregatingState<>(ttlAggregateFunction),
+			ttlConfig, timeProvider, null, ttlAggregateFunction);
+	}
+
+	@Override
+	void initTestValues() {
+		updater = v -> ttlState.add(v);
+		getter = () -> ttlState.get();
+		originalGetter = () -> ttlState.original.get();
+
+		updateEmpty = 5;
+		updateUnexpired = 7;
+		updateExpired = 6;
+
+		getUpdateEmpty = "8";
+		getUnexpired = "15";
+		getUpdateExpired = "9";
+	}
+
+	@Override
+	String getMergeResult(
+		List<Tuple2<String, Integer>> unexpiredUpdatesToMerge,
+		List<Tuple2<String, Integer>> finalUpdatesToMerge) {
+		Set<String> namespaces = new HashSet<>();
+		unexpiredUpdatesToMerge.forEach(t -> namespaces.add(t.f0));
+		finalUpdatesToMerge.forEach(t -> namespaces.add(t.f0));
+		return Integer.toString(getIntegerMergeResult(unexpiredUpdatesToMerge, finalUpdatesToMerge) +
+			namespaces.size() * (int) DEFAULT_ACCUMULATOR);
+	}
+
+	private static class MockInternalTtlAggregatingState<K, N, IN, ACC, OUT>
+		extends MockInternalMergingState<K, N, IN, ACC, OUT> implements InternalAggregatingState<K, N, IN, ACC, OUT> {
+		private final AggregateFunction<IN, ACC, OUT> aggregateFunction;
+
+		private MockInternalTtlAggregatingState(AggregateFunction<IN, ACC, OUT> aggregateFunction) {
+			this.aggregateFunction = aggregateFunction;
+		}
+
+		@Override
+		public OUT get() {
+			return aggregateFunction.getResult(getInternal());
+		}
+
+		@Override
+		public void add(IN value) {
+			updateInternal(aggregateFunction.add(value,  getInternal()));
+		}
+
+		@Override
+		ACC mergeState(ACC acc, ACC nAcc) {
+			return aggregateFunction.merge(acc, nAcc);
+		}
+	}
+
+	private static final AggregateFunction<Integer, Long, String> AGGREGATE =
+		new AggregateFunction<Integer, Long, String>() {
+			@Override
+			public Long createAccumulator() {
+				return DEFAULT_ACCUMULATOR;
+			}
+
+			@Override
+			public Long add(Integer value, Long accumulator) {
+				return accumulator + value;
+			}
+
+			@Override
+			public String getResult(Long accumulator) {
+				return accumulator.toString();
+			}
+
+			@Override
+			public Long merge(Long a, Long b) {
+				return a + b;
+			}
+		};
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c13e00c/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlFoldingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlFoldingStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlFoldingStateTest.java
new file mode 100644
index 0000000..01d5ee1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlFoldingStateTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.runtime.state.internal.InternalFoldingState;
+
+/** Test suite for {@link TtlFoldingState}. */
+public class TtlFoldingStateTest extends TtlStateTestBase<TtlFoldingState<?, String, Long, String>, Long, String> {
+	@Override
+	TtlFoldingState<?, String, Long, String> createState() {
+		FoldFunction<Long, TtlValue<String>> ttlFoldFunction = new TtlFoldFunction<>(FOLD, ttlConfig, timeProvider, "1");
+		return new TtlFoldingState<>(
+			new MockInternalFoldingState<>(ttlFoldFunction), ttlConfig, timeProvider, null);
+	}
+
+	@Override
+	void initTestValues() {
+		updater = v -> ttlState.add(v);
+		getter = () -> ttlState.get();
+		originalGetter = () -> ttlState.original.get();
+
+		updateEmpty = 5L;
+		updateUnexpired = 7L;
+		updateExpired = 6L;
+
+		getUpdateEmpty = "6";
+		getUnexpired = "13";
+		getUpdateExpired = "7";
+	}
+
+	private static class MockInternalFoldingState<K, N, T, ACC>
+		extends MockInternalKvState<K, N, ACC> implements InternalFoldingState<K, N, T, ACC> {
+		private final FoldFunction<T, ACC> foldFunction;
+
+		private MockInternalFoldingState(FoldFunction<T, ACC> foldFunction) {
+			this.foldFunction = foldFunction;
+		}
+
+		@Override
+		public ACC get() {
+			return getInternal();
+		}
+
+		@Override
+		public void add(T value) throws Exception {
+			updateInternal(foldFunction.fold(get(), value));
+		}
+	}
+
+	private static final FoldFunction<Long, String> FOLD = (acc, val) -> {
+		long lacc = acc == null ? 0 : Long.parseLong(acc);
+		return Long.toString(val == null ? lacc : lacc + val);
+	};
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c13e00c/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlListStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlListStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlListStateTest.java
new file mode 100644
index 0000000..5128ff2
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlListStateTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.internal.InternalListState;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.StreamSupport;
+
+/** Test suite for {@link TtlListState}. */
+public class TtlListStateTest
+	extends TtlMergingStateBase<TtlListState<?, String, Integer>, List<Integer>, Iterable<Integer>> {
+	@Override
+	TtlListState<?, String, Integer> createState() {
+		return new TtlListState<>(new MockInternalListState<>(), ttlConfig, timeProvider, null);
+	}
+
+	@Override
+	void initTestValues() {
+		updater = v -> ttlState.addAll(v);
+		getter = () -> StreamSupport.stream(ttlState.get().spliterator(), false).collect(Collectors.toList());
+		originalGetter = () -> ttlState.original.get();
+
+		emptyValue = Collections.emptyList();
+
+		updateEmpty = Arrays.asList(5, 7, 10);
+		updateUnexpired = Arrays.asList(8, 9, 11);
+		updateExpired = Arrays.asList(1, 4);
+
+		getUpdateEmpty = updateEmpty;
+		getUnexpired = updateUnexpired;
+		getUpdateExpired = updateExpired;
+	}
+
+	@Override
+	List<Integer> generateRandomUpdate() {
+		int size = RANDOM.nextInt(5);
+		return IntStream.range(0, size).mapToObj(i -> RANDOM.nextInt(100)).collect(Collectors.toList());
+	}
+
+	@Override
+	Iterable<Integer> getMergeResult(
+		List<Tuple2<String, List<Integer>>> unexpiredUpdatesToMerge,
+		List<Tuple2<String, List<Integer>>> finalUpdatesToMerge) {
+		List<Integer> result = new ArrayList<>();
+		finalUpdatesToMerge.forEach(t -> result.addAll(t.f1));
+		return result;
+	}
+
+	private static class MockInternalListState<K, N, T>
+		extends MockInternalMergingState<K, N, T, List<T>, Iterable<T>>
+		implements InternalListState<K, N, T> {
+
+		MockInternalListState() {
+			super(ArrayList::new);
+		}
+
+		@Override
+		public void update(List<T> elements) {
+			updateInternal(elements);
+		}
+
+		@Override
+		public void addAll(List<T> elements) {
+			getInternal().addAll(elements);
+		}
+
+		@Override
+		List<T> mergeState(List<T> acc, List<T> nAcc) {
+			acc = new ArrayList<>(acc);
+			acc.addAll(nAcc);
+			return acc;
+		}
+
+		@Override
+		public Iterable<T> get() {
+			return getInternal();
+		}
+
+		@Override
+		public void add(T element) {
+			getInternal().add(element);
+		}
+
+		@Override
+		public void clear() {
+			getInternal().clear();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c13e00c/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTest.java
new file mode 100644
index 0000000..ac9b038
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+/** Test suite for per element methods of {@link TtlMapState}. */
+public class TtlMapStatePerElementTest extends TtlStateTestBase<TtlMapState<?, String, Integer, String>, String, String> {
+	private static final int TEST_KEY = 1;
+	private static final String TEST_VAL1 = "test value1";
+	private static final String TEST_VAL2 = "test value2";
+	private static final String TEST_VAL3 = "test value3";
+
+	@Override
+	TtlMapState<?, String, Integer, String> createState() {
+		return new TtlMapState<>(new MockInternalMapState<>(), ttlConfig, timeProvider, null);
+	}
+
+	@Override
+	void initTestValues() {
+		updater = v -> ttlState.put(TEST_KEY, v);
+		getter = () -> ttlState.get(TEST_KEY);
+		originalGetter = () -> ttlState.original.get(TEST_KEY);
+
+		updateEmpty = TEST_VAL1;
+		updateUnexpired = TEST_VAL2;
+		updateExpired = TEST_VAL3;
+
+		getUpdateEmpty = TEST_VAL1;
+		getUnexpired = TEST_VAL2;
+		getUpdateExpired = TEST_VAL3;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c13e00c/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTest.java
new file mode 100644
index 0000000..535962b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/** Test suite for collection methods of {@link TtlMapState}. */
+public class TtlMapStateTest extends
+	TtlStateTestBase<TtlMapState<?, String, Integer, String>, Map<Integer, String>, Set<Map.Entry<Integer, String>>> {
+
+	@Override
+	TtlMapState<?, String, Integer, String> createState() {
+		return new TtlMapState<>(new MockInternalMapState<>(), ttlConfig, timeProvider, null);
+	}
+
+	@Override
+	void initTestValues() {
+		updater = map -> ttlState.putAll(map);
+		getter = () -> StreamSupport.stream(ttlState.entries().spliterator(), false).collect(Collectors.toSet());
+		originalGetter = () -> ttlState.original.entries();
+
+		emptyValue = Collections.emptySet();
+
+		updateEmpty = mapOf(Tuple2.of(3, "3"), Tuple2.of(5, "5"), Tuple2.of(10, "10"));
+		updateUnexpired = mapOf(Tuple2.of(12, "12"), Tuple2.of(7, "7"));
+		updateExpired = mapOf(Tuple2.of(15, "15"), Tuple2.of(4, "4"));
+
+		getUpdateEmpty = updateEmpty.entrySet();
+		getUnexpired = updateUnexpired.entrySet();
+		getUpdateExpired = updateExpired.entrySet();
+	}
+
+	@SafeVarargs
+	private static <UK, UV> Map<UK, UV> mapOf(Tuple2<UK, UV> ... entries) {
+		return Arrays.stream(entries).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c13e00c/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMergingStateBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMergingStateBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMergingStateBase.java
new file mode 100644
index 0000000..6a7aebe
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMergingStateBase.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.internal.InternalMergingState;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+
+abstract class TtlMergingStateBase<S extends InternalMergingState<?, String, ?, ?, GV>, UV, GV>
+	extends TtlStateTestBase<S, UV, GV> {
+	static final Random RANDOM = new Random();
+
+	private static final List<String> NAMESPACES = Arrays.asList(
+		"unsetNamespace1",
+		"unsetNamespace2",
+		"expiredNamespace",
+		"expiredAndUpdatedNamespace",
+		"unexpiredNamespace",
+		"finalNamespace");
+
+	@Test
+	public void testMergeNamespaces() throws Exception {
+		initTest();
+
+		timeProvider.time = 0;
+		List<Tuple2<String, UV>> expiredUpdatesToMerge = generateExpiredUpdatesToMerge();
+		applyStateUpdates(expiredUpdatesToMerge);
+
+		timeProvider.time = 120;
+		List<Tuple2<String, UV>> unexpiredUpdatesToMerge = generateUnexpiredUpdatesToMerge();
+		applyStateUpdates(unexpiredUpdatesToMerge);
+
+		timeProvider.time = 150;
+		List<Tuple2<String, UV>> finalUpdatesToMerge = generateFinalUpdatesToMerge();
+		applyStateUpdates(finalUpdatesToMerge);
+
+		timeProvider.time = 230;
+		ttlState.mergeNamespaces("targetNamespace", NAMESPACES);
+		ttlState.setCurrentNamespace("targetNamespace");
+		assertEquals("Unexpected result of merge operation",
+			getMergeResult(unexpiredUpdatesToMerge, finalUpdatesToMerge), getter.get());
+	}
+
+	private List<Tuple2<String, UV>> generateExpiredUpdatesToMerge() {
+		return Arrays.asList(
+			Tuple2.of("expiredNamespace", generateRandomUpdate()),
+			Tuple2.of("expiredNamespace", generateRandomUpdate()),
+			Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate()),
+			Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate())
+		);
+	}
+
+	private List<Tuple2<String, UV>> generateUnexpiredUpdatesToMerge() {
+		return Arrays.asList(
+			Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate()),
+			Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate()),
+			Tuple2.of("unexpiredNamespace", generateRandomUpdate()),
+			Tuple2.of("unexpiredNamespace", generateRandomUpdate())
+		);
+	}
+
+	private List<Tuple2<String, UV>> generateFinalUpdatesToMerge() {
+		return Arrays.asList(
+			Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate()),
+			Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate()),
+			Tuple2.of("unexpiredNamespace", generateRandomUpdate()),
+			Tuple2.of("unexpiredNamespace", generateRandomUpdate()),
+			Tuple2.of("finalNamespace", generateRandomUpdate()),
+			Tuple2.of("finalNamespace", generateRandomUpdate())
+		);
+	}
+
+	abstract UV generateRandomUpdate();
+
+	private void applyStateUpdates(List<Tuple2<String, UV>> updates) throws Exception {
+		for (Tuple2<String, UV> t : updates) {
+			ttlState.setCurrentNamespace(t.f0);
+			updater.accept(t.f1);
+		}
+	}
+
+	abstract GV getMergeResult(
+		List<Tuple2<String, UV>> unexpiredUpdatesToMerge,
+		List<Tuple2<String, UV>> finalUpdatesToMerge);
+
+	@SuppressWarnings("unchecked")
+	abstract static class TtlIntegerMergingStateBase<
+		S extends InternalMergingState<?, String, ?, ?, GV>,
+		UV extends Number, GV>
+		extends TtlMergingStateBase<S, UV, GV> {
+		@Override
+		UV generateRandomUpdate() {
+			return (UV) (Integer) RANDOM.nextInt(1000);
+		}
+
+		int getIntegerMergeResult(
+			List<Tuple2<String, UV>> unexpiredUpdatesToMerge,
+			List<Tuple2<String, UV>> finalUpdatesToMerge) {
+			return unexpiredUpdatesToMerge.stream().mapToInt(t -> (Integer) t.f1).sum() +
+				finalUpdatesToMerge.stream().mapToInt(t -> (Integer) t.f1).sum();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c13e00c/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlReducingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlReducingStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlReducingStateTest.java
new file mode 100644
index 0000000..44043a1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlReducingStateTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.internal.InternalReducingState;
+
+import java.util.List;
+
+/** Test suite for {@link TtlReducingState}. */
+public class TtlReducingStateTest
+	extends TtlMergingStateBase.TtlIntegerMergingStateBase<TtlReducingState<?, String, Integer>, Integer, Integer> {
+	@Override
+	TtlReducingState<?, String, Integer> createState() {
+		ReduceFunction<TtlValue<Integer>> ttlReduceFunction = new TtlReduceFunction<>(REDUCE, ttlConfig, timeProvider);
+		return new TtlReducingState<>(
+			new MockInternalReducingState<>(ttlReduceFunction), ttlConfig, timeProvider, null);
+	}
+
+	@Override
+	void initTestValues() {
+		updater = v -> ttlState.add(v);
+		getter = () -> ttlState.get();
+		originalGetter = () -> ttlState.original.get();
+
+		updateEmpty = 5;
+		updateUnexpired = 7;
+		updateExpired = 6;
+
+		getUpdateEmpty = 5;
+		getUnexpired = 12;
+		getUpdateExpired = 6;
+	}
+
+	@Override
+	Integer getMergeResult(
+		List<Tuple2<String, Integer>> unexpiredUpdatesToMerge,
+		List<Tuple2<String, Integer>> finalUpdatesToMerge) {
+		return getIntegerMergeResult(unexpiredUpdatesToMerge, finalUpdatesToMerge);
+	}
+
+	private static class MockInternalReducingState<K, N, T>
+		extends MockInternalMergingState<K, N, T, T, T> implements InternalReducingState<K, N, T> {
+		private final ReduceFunction<T> reduceFunction;
+
+		private MockInternalReducingState(ReduceFunction<T> reduceFunction) {
+			this.reduceFunction = reduceFunction;
+		}
+
+		@Override
+		public T get() {
+			return getInternal();
+		}
+
+		@Override
+		public void add(T value) throws Exception {
+			updateInternal(reduceFunction.reduce(get(), value));
+		}
+
+		@Override
+		T mergeState(T t, T nAcc) throws Exception {
+			return reduceFunction.reduce(t, nAcc);
+		}
+	}
+
+	private static final ReduceFunction<Integer> REDUCE = (v1, v2) -> {
+		if (v1 == null && v2 == null) {
+			return null;
+		} else if (v1 == null) {
+			return v2;
+		} else if (v2 == null) {
+			return v1;
+		} else {
+			return v1 + v2;
+		}
+	};
+}