You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/07/11 17:17:06 UTC

flink git commit: [FLINK-9511] Implement state TTL configuration

Repository: flink
Updated Branches:
  refs/heads/master 5c43d2b8a -> dc7d81c9c


[FLINK-9511] Implement state TTL configuration

This closes #6277.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dc7d81c9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dc7d81c9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dc7d81c9

Branch: refs/heads/master
Commit: dc7d81c9c0fdf33b97cdc467e5af064b2097c9de
Parents: 5c43d2b
Author: yanghua <ya...@gmail.com>
Authored: Sat Jul 7 10:53:42 2018 +0800
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Wed Jul 11 19:16:27 2018 +0200

----------------------------------------------------------------------
 .../api/common/state/StateTtlConfiguration.java | 174 +++++++++++++++++++
 .../runtime/state/ttl/AbstractTtlDecorator.java |  11 +-
 .../runtime/state/ttl/AbstractTtlState.java     |   3 +-
 .../runtime/state/ttl/TtlAggregateFunction.java |   3 +-
 .../runtime/state/ttl/TtlAggregatingState.java  |   3 +-
 .../flink/runtime/state/ttl/TtlConfig.java      |  96 ----------
 .../runtime/state/ttl/TtlFoldFunction.java      |   3 +-
 .../runtime/state/ttl/TtlFoldingState.java      |   3 +-
 .../flink/runtime/state/ttl/TtlListState.java   |   3 +-
 .../flink/runtime/state/ttl/TtlMapState.java    |   3 +-
 .../runtime/state/ttl/TtlReduceFunction.java    |   3 +-
 .../runtime/state/ttl/TtlReducingState.java     |   3 +-
 .../runtime/state/ttl/TtlStateFactory.java      |   9 +-
 .../flink/runtime/state/ttl/TtlValueState.java  |   3 +-
 .../runtime/state/ttl/TtlStateTestBase.java     |  30 ++--
 15 files changed, 221 insertions(+), 129 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dc7d81c9/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfiguration.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfiguration.java
new file mode 100644
index 0000000..8ef2046
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfiguration.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.util.Preconditions;
+
+import static org.apache.flink.api.common.state.StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired;
+import static org.apache.flink.api.common.state.StateTtlConfiguration.TtlTimeCharacteristic.ProcessingTime;
+import static org.apache.flink.api.common.state.StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite;
+
+/**
+ * Configuration of state TTL logic.
+ * TODO: builder
+ */
+public class StateTtlConfiguration {
+	/**
+	 * This option value configures when to update last access timestamp which prolongs state TTL.
+	 */
+	public enum TtlUpdateType {
+		/** TTL is disabled. State does not expire. */
+		Disabled,
+		/** Last access timestamp is initialised when state is created and updated on every write operation. */
+		OnCreateAndWrite,
+		/** The same as <code>OnCreateAndWrite</code> but also updated on read. */
+		OnReadAndWrite
+	}
+
+	/**
+	 * This option configures whether expired user value can be returned or not.
+	 */
+	public enum TtlStateVisibility {
+		/** Return expired user value if it is not cleaned up yet. */
+		ReturnExpiredIfNotCleanedUp,
+		/** Never return expired user value. */
+		NeverReturnExpired
+	}
+
+	/**
+	 * This option configures time scale to use for ttl.
+	 */
+	public enum TtlTimeCharacteristic {
+		/** Processing time, see also <code>TimeCharacteristic.ProcessingTime</code>. */
+		ProcessingTime
+	}
+
+	private final TtlUpdateType ttlUpdateType;
+	private final TtlStateVisibility stateVisibility;
+	private final TtlTimeCharacteristic timeCharacteristic;
+	private final Time ttl;
+
+	private StateTtlConfiguration(
+		TtlUpdateType ttlUpdateType,
+		TtlStateVisibility stateVisibility,
+		TtlTimeCharacteristic timeCharacteristic,
+		Time ttl) {
+		this.ttlUpdateType = Preconditions.checkNotNull(ttlUpdateType);
+		this.stateVisibility = Preconditions.checkNotNull(stateVisibility);
+		this.timeCharacteristic = Preconditions.checkNotNull(timeCharacteristic);
+		this.ttl = Preconditions.checkNotNull(ttl);
+		Preconditions.checkArgument(ttl.toMilliseconds() > 0,
+			"TTL is expected to be positive");
+	}
+
+	public TtlUpdateType getTtlUpdateType() {
+		return ttlUpdateType;
+	}
+
+	public TtlStateVisibility getStateVisibility() {
+		return stateVisibility;
+	}
+
+	public Time getTtl() {
+		return ttl;
+	}
+
+	public TtlTimeCharacteristic getTimeCharacteristic() {
+		return timeCharacteristic;
+	}
+
+	@Override
+	public String toString() {
+		return "StateTtlConfiguration{" +
+			"ttlUpdateType=" + ttlUpdateType +
+			", stateVisibility=" + stateVisibility +
+			", timeCharacteristic=" + timeCharacteristic +
+			", ttl=" + ttl +
+			'}';
+	}
+
+	public static Builder newBuilder(Time ttl) {
+		return new Builder(ttl);
+	}
+
+	/**
+	 * Builder for the {@link StateTtlConfiguration}.
+	 */
+	public static class Builder {
+
+		private TtlUpdateType ttlUpdateType = OnCreateAndWrite;
+		private TtlStateVisibility stateVisibility = NeverReturnExpired;
+		private TtlTimeCharacteristic timeCharacteristic = ProcessingTime;
+		private Time ttl;
+
+		public Builder(Time ttl) {
+			this.ttl = ttl;
+		}
+
+		/**
+		 * Sets the ttl update type.
+		 *
+		 * @param ttlUpdateType The ttl update type configures when to update last access timestamp which prolongs state TTL.
+		 */
+		public Builder setTtlUpdateType(TtlUpdateType ttlUpdateType) {
+			this.ttlUpdateType = ttlUpdateType;
+			return this;
+		}
+
+		/**
+		 * Sets the state visibility.
+		 *
+		 * @param stateVisibility The state visibility configures whether expired user value can be returned or not.
+		 */
+		public Builder setStateVisibility(TtlStateVisibility stateVisibility) {
+			this.stateVisibility = stateVisibility;
+			return this;
+		}
+
+		/**
+		 * Sets the time characteristic.
+		 *
+		 * @param timeCharacteristic The time characteristic configures time scale to use for ttl.
+		 */
+		public Builder setTimeCharacteristic(TtlTimeCharacteristic timeCharacteristic) {
+			this.timeCharacteristic = timeCharacteristic;
+			return this;
+		}
+
+		/**
+		 * Sets the ttl time.
+		 * @param ttl The ttl time.
+		 */
+		public Builder setTtl(Time ttl) {
+			this.ttl = ttl;
+			return this;
+		}
+
+		public StateTtlConfiguration build() {
+			return new StateTtlConfiguration(
+				ttlUpdateType,
+				stateVisibility,
+				timeCharacteristic,
+				ttl
+			);
+		}
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dc7d81c9/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
index 12efc00..29a575a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.state.ttl;
 
+import org.apache.flink.api.common.state.StateTtlConfiguration;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.SupplierWithException;
 import org.apache.flink.util.function.ThrowingConsumer;
@@ -34,7 +35,7 @@ abstract class AbstractTtlDecorator<T> {
 	/** Wrapped original state handler. */
 	final T original;
 
-	final TtlConfig config;
+	final StateTtlConfiguration config;
 
 	final TtlTimeProvider timeProvider;
 
@@ -49,18 +50,18 @@ abstract class AbstractTtlDecorator<T> {
 
 	AbstractTtlDecorator(
 		T original,
-		TtlConfig config,
+		StateTtlConfiguration config,
 		TtlTimeProvider timeProvider) {
 		Preconditions.checkNotNull(original);
 		Preconditions.checkNotNull(config);
 		Preconditions.checkNotNull(timeProvider);
-		Preconditions.checkArgument(config.getTtlUpdateType() != TtlConfig.TtlUpdateType.Disabled,
+		Preconditions.checkArgument(config.getTtlUpdateType() != StateTtlConfiguration.TtlUpdateType.Disabled,
 			"State does not need to be wrapped with TTL if it is configured as disabled.");
 		this.original = original;
 		this.config = config;
 		this.timeProvider = timeProvider;
-		this.updateTsOnRead = config.getTtlUpdateType() == TtlConfig.TtlUpdateType.OnReadAndWrite;
-		this.returnExpired = config.getStateVisibility() == TtlConfig.TtlStateVisibility.ReturnExpiredIfNotCleanedUp;
+		this.updateTsOnRead = config.getTtlUpdateType() == StateTtlConfiguration.TtlUpdateType.OnReadAndWrite;
+		this.returnExpired = config.getStateVisibility() == StateTtlConfiguration.TtlStateVisibility.ReturnExpiredIfNotCleanedUp;
 		this.ttl = config.getTtl().toMilliseconds();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/dc7d81c9/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlState.java
index ddab8f4..7903796 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlState.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.state.ttl;
 
+import org.apache.flink.api.common.state.StateTtlConfiguration;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.util.FlinkRuntimeException;
@@ -38,7 +39,7 @@ abstract class AbstractTtlState<K, N, SV, TTLSV, S extends InternalKvState<K, N,
 	implements InternalKvState<K, N, SV> {
 	private final TypeSerializer<SV> valueSerializer;
 
-	AbstractTtlState(S original, TtlConfig config, TtlTimeProvider timeProvider, TypeSerializer<SV> valueSerializer) {
+	AbstractTtlState(S original, StateTtlConfiguration config, TtlTimeProvider timeProvider, TypeSerializer<SV> valueSerializer) {
 		super(original, config, timeProvider);
 		this.valueSerializer = valueSerializer;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/dc7d81c9/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAggregateFunction.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAggregateFunction.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAggregateFunction.java
index 7d35ef3..5448ba1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAggregateFunction.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAggregateFunction.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.state.ttl;
 
 import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.state.StateTtlConfiguration;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.ThrowingConsumer;
@@ -37,7 +38,7 @@ class TtlAggregateFunction<IN, ACC, OUT>
 	ThrowingRunnable<Exception> stateClear;
 	ThrowingConsumer<TtlValue<ACC>, Exception> updater;
 
-	TtlAggregateFunction(AggregateFunction<IN, ACC, OUT> aggFunction, TtlConfig config, TtlTimeProvider timeProvider) {
+	TtlAggregateFunction(AggregateFunction<IN, ACC, OUT> aggFunction, StateTtlConfiguration config, TtlTimeProvider timeProvider) {
 		super(aggFunction, config, timeProvider);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/dc7d81c9/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAggregatingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAggregatingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAggregatingState.java
index 097e49b..a90698e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAggregatingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAggregatingState.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.state.ttl;
 
+import org.apache.flink.api.common.state.StateTtlConfiguration;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.internal.InternalAggregatingState;
 
@@ -39,7 +40,7 @@ class TtlAggregatingState<K, N, IN, ACC, OUT>
 
 	TtlAggregatingState(
 		InternalAggregatingState<K, N, IN, TtlValue<ACC>, OUT> originalState,
-		TtlConfig config,
+		StateTtlConfiguration config,
 		TtlTimeProvider timeProvider,
 		TypeSerializer<ACC> valueSerializer,
 		TtlAggregateFunction<IN, ACC, OUT> aggregateFunction) {

http://git-wip-us.apache.org/repos/asf/flink/blob/dc7d81c9/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlConfig.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlConfig.java
deleted file mode 100644
index 17cac49..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlConfig.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.ttl;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.util.Preconditions;
-
-/**
- * Configuration of state TTL logic.
- * TODO: builder
- */
-public class TtlConfig {
-	/**
-	 * This option value configures when to update last access timestamp which prolongs state TTL.
-	 */
-	public enum TtlUpdateType {
-		/** TTL is disabled. State does not expire. */
-		Disabled,
-		/** Last access timestamp is initialised when state is created and updated on every write operation. */
-		OnCreateAndWrite,
-		/** The same as <code>OnCreateAndWrite</code> but also updated on read. */
-		OnReadAndWrite
-	}
-
-	/**
-	 * This option configures whether expired user value can be returned or not.
-	 */
-	public enum TtlStateVisibility {
-		/** Return expired user value if it is not cleaned up yet. */
-		ReturnExpiredIfNotCleanedUp,
-		/** Never return expired user value. */
-		NeverReturnExpired
-	}
-
-	/**
-	 * This option configures time scale to use for ttl.
-	 */
-	public enum TtlTimeCharacteristic {
-		/** Processing time, see also <code>TimeCharacteristic.ProcessingTime</code>. */
-		ProcessingTime
-	}
-
-	private final TtlUpdateType ttlUpdateType;
-	private final TtlStateVisibility stateVisibility;
-	private final TtlTimeCharacteristic timeCharacteristic;
-	private final Time ttl;
-
-	public TtlConfig(
-		TtlUpdateType ttlUpdateType,
-		TtlStateVisibility stateVisibility,
-		TtlTimeCharacteristic timeCharacteristic,
-		Time ttl) {
-		Preconditions.checkNotNull(ttlUpdateType);
-		Preconditions.checkNotNull(stateVisibility);
-		Preconditions.checkNotNull(timeCharacteristic);
-		Preconditions.checkNotNull(ttl);
-		Preconditions.checkArgument(ttl.toMilliseconds() > 0,
-			"TTL is expected to be positive");
-		this.ttlUpdateType = ttlUpdateType;
-		this.stateVisibility = stateVisibility;
-		this.timeCharacteristic = timeCharacteristic;
-		this.ttl = ttl;
-	}
-
-	public TtlUpdateType getTtlUpdateType() {
-		return ttlUpdateType;
-	}
-
-	public TtlStateVisibility getStateVisibility() {
-		return stateVisibility;
-	}
-
-	public Time getTtl() {
-		return ttl;
-	}
-
-	public TtlTimeCharacteristic getTimeCharacteristic() {
-		return timeCharacteristic;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/dc7d81c9/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldFunction.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldFunction.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldFunction.java
index 8d54d87..c7305bd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldFunction.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldFunction.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.state.ttl;
 
 import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.state.StateTtlConfiguration;
 
 /**
  * This class wraps folding function with TTL logic.
@@ -35,7 +36,7 @@ class TtlFoldFunction<T, ACC>
 	private final ACC defaultAccumulator;
 
 	TtlFoldFunction(
-		FoldFunction<T, ACC> original, TtlConfig config, TtlTimeProvider timeProvider, ACC defaultAccumulator) {
+		FoldFunction<T, ACC> original, StateTtlConfiguration config, TtlTimeProvider timeProvider, ACC defaultAccumulator) {
 		super(original, config, timeProvider);
 		this.defaultAccumulator = defaultAccumulator;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/dc7d81c9/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldingState.java
index 46f56d2..c3a75e4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldingState.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.state.ttl;
 
 import org.apache.flink.api.common.state.AggregatingState;
+import org.apache.flink.api.common.state.StateTtlConfiguration;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.internal.InternalFoldingState;
 
@@ -36,7 +37,7 @@ class TtlFoldingState<K, N, T, ACC>
 	implements InternalFoldingState<K, N, T, ACC> {
 	TtlFoldingState(
 		InternalFoldingState<K, N, T, TtlValue<ACC>> originalState,
-		TtlConfig config,
+		StateTtlConfiguration config,
 		TtlTimeProvider timeProvider,
 		TypeSerializer<ACC> valueSerializer) {
 		super(originalState, config, timeProvider, valueSerializer);

http://git-wip-us.apache.org/repos/asf/flink/blob/dc7d81c9/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
index 6da0d8f..77e92f6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.state.ttl;
 
+import org.apache.flink.api.common.state.StateTtlConfiguration;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.util.Preconditions;
@@ -42,7 +43,7 @@ class TtlListState<K, N, T> extends
 	implements InternalListState<K, N, T> {
 	TtlListState(
 		InternalListState<K, N, TtlValue<T>> originalState,
-		TtlConfig config,
+		StateTtlConfiguration config,
 		TtlTimeProvider timeProvider,
 		TypeSerializer<List<T>> valueSerializer) {
 		super(originalState, config, timeProvider, valueSerializer);

http://git-wip-us.apache.org/repos/asf/flink/blob/dc7d81c9/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
index 352c6c9..98d7c52 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.state.ttl;
 
+import org.apache.flink.api.common.state.StateTtlConfiguration;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.internal.InternalMapState;
 import org.apache.flink.util.FlinkRuntimeException;
@@ -43,7 +44,7 @@ class TtlMapState<K, N, UK, UV>
 	implements InternalMapState<K, N, UK, UV> {
 	TtlMapState(
 		InternalMapState<K, N, UK, TtlValue<UV>> original,
-		TtlConfig config,
+		StateTtlConfiguration config,
 		TtlTimeProvider timeProvider,
 		TypeSerializer<Map<UK, UV>> valueSerializer) {
 		super(original, config, timeProvider, valueSerializer);

http://git-wip-us.apache.org/repos/asf/flink/blob/dc7d81c9/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReduceFunction.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReduceFunction.java
index de593c5..fa7c8bf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReduceFunction.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReduceFunction.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.state.ttl;
 
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.StateTtlConfiguration;
 
 /**
  * This class wraps reducing function with TTL logic.
@@ -31,7 +32,7 @@ class TtlReduceFunction<T>
 
 	TtlReduceFunction(
 		ReduceFunction<T> originalReduceFunction,
-		TtlConfig config,
+		StateTtlConfiguration config,
 		TtlTimeProvider timeProvider) {
 		super(originalReduceFunction, config, timeProvider);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/dc7d81c9/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReducingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReducingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReducingState.java
index 9169d45..01e4be9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReducingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReducingState.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.state.ttl;
 
+import org.apache.flink.api.common.state.StateTtlConfiguration;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.internal.InternalReducingState;
 
@@ -35,7 +36,7 @@ class TtlReducingState<K, N, T>
 	implements InternalReducingState<K, N, T> {
 	TtlReducingState(
 		InternalReducingState<K, N, TtlValue<T>> originalState,
-		TtlConfig config,
+		StateTtlConfiguration config,
 		TtlTimeProvider timeProvider,
 		TypeSerializer<T> valueSerializer) {
 		super(originalState, config, timeProvider, valueSerializer);

http://git-wip-us.apache.org/repos/asf/flink/blob/dc7d81c9/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
index 2f7593a..82096a6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.StateTtlConfiguration;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.CompositeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -48,14 +49,14 @@ public class TtlStateFactory {
 		TypeSerializer<N> namespaceSerializer,
 		StateDescriptor<S, SV> stateDesc,
 		KeyedStateFactory originalStateFactory,
-		TtlConfig ttlConfig,
+		StateTtlConfiguration ttlConfig,
 		TtlTimeProvider timeProvider) throws Exception {
 		Preconditions.checkNotNull(namespaceSerializer);
 		Preconditions.checkNotNull(stateDesc);
 		Preconditions.checkNotNull(originalStateFactory);
 		Preconditions.checkNotNull(ttlConfig);
 		Preconditions.checkNotNull(timeProvider);
-		return ttlConfig.getTtlUpdateType() == TtlConfig.TtlUpdateType.Disabled ?
+		return ttlConfig.getTtlUpdateType() == StateTtlConfiguration.TtlUpdateType.Disabled ?
 			originalStateFactory.createState(namespaceSerializer, stateDesc) :
 			new TtlStateFactory(originalStateFactory, ttlConfig, timeProvider)
 				.createState(namespaceSerializer, stateDesc);
@@ -64,10 +65,10 @@ public class TtlStateFactory {
 	private final Map<Class<? extends StateDescriptor>, KeyedStateFactory> stateFactories;
 
 	private final KeyedStateFactory originalStateFactory;
-	private final TtlConfig ttlConfig;
+	private final StateTtlConfiguration ttlConfig;
 	private final TtlTimeProvider timeProvider;
 
-	private TtlStateFactory(KeyedStateFactory originalStateFactory, TtlConfig ttlConfig, TtlTimeProvider timeProvider) {
+	private TtlStateFactory(KeyedStateFactory originalStateFactory, StateTtlConfiguration ttlConfig, TtlTimeProvider timeProvider) {
 		this.originalStateFactory = originalStateFactory;
 		this.ttlConfig = ttlConfig;
 		this.timeProvider = timeProvider;

http://git-wip-us.apache.org/repos/asf/flink/blob/dc7d81c9/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValueState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValueState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValueState.java
index 9feb62d..c14a583 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValueState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValueState.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.state.ttl;
 
+import org.apache.flink.api.common.state.StateTtlConfiguration;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.internal.InternalValueState;
 
@@ -35,7 +36,7 @@ class TtlValueState<K, N, T>
 	implements InternalValueState<K, N, T> {
 	TtlValueState(
 		InternalValueState<K, N, TtlValue<T>> originalState,
-		TtlConfig config,
+		StateTtlConfiguration config,
 		TtlTimeProvider timeProvider,
 		TypeSerializer<T> valueSerializer) {
 		super(originalState, config, timeProvider, valueSerializer);

http://git-wip-us.apache.org/repos/asf/flink/blob/dc7d81c9/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
index 13eac32..bc3d6e7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.state.ttl;
 
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.StateTtlConfiguration;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.runtime.state.KeyedStateFactory;
@@ -39,7 +40,7 @@ abstract class TtlStateTestBase<S extends InternalKvState<?, String, ?>, UV, GV>
 
 	S ttlState;
 	MockTimeProvider timeProvider;
-	TtlConfig ttlConfig;
+	StateTtlConfiguration ttlConfig;
 
 	ThrowingConsumer<UV, Exception> updater;
 	SupplierWithException<GV, Exception> getter;
@@ -56,20 +57,21 @@ abstract class TtlStateTestBase<S extends InternalKvState<?, String, ?>, UV, GV>
 	GV emptyValue = null;
 
 	void initTest() {
-		initTest(TtlConfig.TtlUpdateType.OnCreateAndWrite, TtlConfig.TtlStateVisibility.NeverReturnExpired);
+		initTest(StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite, StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired);
 	}
 
-	private void initTest(TtlConfig.TtlUpdateType updateType, TtlConfig.TtlStateVisibility visibility) {
+	private void initTest(StateTtlConfiguration.TtlUpdateType updateType, StateTtlConfiguration.TtlStateVisibility visibility) {
 		initTest(updateType, visibility, TTL);
 	}
 
-	private void initTest(TtlConfig.TtlUpdateType updateType, TtlConfig.TtlStateVisibility visibility, long ttl) {
+	private void initTest(StateTtlConfiguration.TtlUpdateType updateType, StateTtlConfiguration.TtlStateVisibility visibility, long ttl) {
 		timeProvider = new MockTimeProvider();
-		ttlConfig = new TtlConfig(
-			updateType,
-			visibility,
-			TtlConfig.TtlTimeCharacteristic.ProcessingTime,
-			Time.milliseconds(ttl));
+		StateTtlConfiguration.Builder ttlConfigBuilder = StateTtlConfiguration.newBuilder(Time.seconds(5));
+		ttlConfigBuilder.setTtlUpdateType(updateType)
+						.setStateVisibility(visibility)
+						.setTimeCharacteristic(StateTtlConfiguration.TtlTimeCharacteristic.ProcessingTime)
+						.setTtl(Time.milliseconds(ttl));
+		ttlConfig = ttlConfigBuilder.build();
 		ttlState = createState();
 		initTestValues();
 	}
@@ -96,7 +98,7 @@ abstract class TtlStateTestBase<S extends InternalKvState<?, String, ?>, UV, GV>
 
 	@Test
 	public void testExactExpirationOnWrite() throws Exception {
-		initTest(TtlConfig.TtlUpdateType.OnCreateAndWrite, TtlConfig.TtlStateVisibility.NeverReturnExpired);
+		initTest(StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite, StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired);
 
 		timeProvider.time = 0;
 		updater.accept(updateEmpty);
@@ -123,7 +125,7 @@ abstract class TtlStateTestBase<S extends InternalKvState<?, String, ?>, UV, GV>
 
 	@Test
 	public void testRelaxedExpirationOnWrite() throws Exception {
-		initTest(TtlConfig.TtlUpdateType.OnCreateAndWrite, TtlConfig.TtlStateVisibility.ReturnExpiredIfNotCleanedUp);
+		initTest(StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite, StateTtlConfiguration.TtlStateVisibility.ReturnExpiredIfNotCleanedUp);
 
 		timeProvider.time = 0;
 		updater.accept(updateEmpty);
@@ -135,7 +137,7 @@ abstract class TtlStateTestBase<S extends InternalKvState<?, String, ?>, UV, GV>
 
 	@Test
 	public void testExactExpirationOnRead() throws Exception {
-		initTest(TtlConfig.TtlUpdateType.OnReadAndWrite, TtlConfig.TtlStateVisibility.NeverReturnExpired);
+		initTest(StateTtlConfiguration.TtlUpdateType.OnReadAndWrite, StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired);
 
 		timeProvider.time = 0;
 		updater.accept(updateEmpty);
@@ -153,7 +155,7 @@ abstract class TtlStateTestBase<S extends InternalKvState<?, String, ?>, UV, GV>
 
 	@Test
 	public void testRelaxedExpirationOnRead() throws Exception {
-		initTest(TtlConfig.TtlUpdateType.OnReadAndWrite, TtlConfig.TtlStateVisibility.ReturnExpiredIfNotCleanedUp);
+		initTest(StateTtlConfiguration.TtlUpdateType.OnReadAndWrite, StateTtlConfiguration.TtlStateVisibility.ReturnExpiredIfNotCleanedUp);
 
 		timeProvider.time = 0;
 		updater.accept(updateEmpty);
@@ -168,7 +170,7 @@ abstract class TtlStateTestBase<S extends InternalKvState<?, String, ?>, UV, GV>
 
 	@Test
 	public void testExpirationTimestampOverflow() throws Exception {
-		initTest(TtlConfig.TtlUpdateType.OnCreateAndWrite, TtlConfig.TtlStateVisibility.NeverReturnExpired, Long.MAX_VALUE);
+		initTest(StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite, StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired, Long.MAX_VALUE);
 
 		timeProvider.time = 10;
 		updater.accept(updateEmpty);