You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/14 05:52:32 UTC

[12/12] flink git commit: [FLINK-9701] [state] (follow up) Use StateTtlConfiguration.DISABLED instead of null, make it Serializable and add convenience methods to its builder

[FLINK-9701] [state] (follow up) Use StateTtlConfiguration.DISABLED instead of null, make it Serializable and add
convenience methods to its builder

This closes #6331.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1632681e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1632681e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1632681e

Branch: refs/heads/master
Commit: 1632681e41cbc1092a6b4d47a58adfffba6af5d4
Parents: 57872d5
Author: Andrey Zagrebin <az...@gmail.com>
Authored: Thu Jul 12 17:12:18 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Jul 13 18:32:57 2018 +0200

----------------------------------------------------------------------
 .../flink/api/common/state/StateDescriptor.java | 14 +++++---
 .../api/common/state/StateTtlConfiguration.java | 36 +++++++++++++++++++-
 .../runtime/state/ttl/TtlStateFactory.java      |  6 ++--
 3 files changed, 47 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1632681e/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
index 956fd05..191eb6f 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
@@ -28,6 +28,7 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.io.ByteArrayInputStream;
@@ -94,8 +95,8 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
 	private String queryableStateName;
 
 	/** Name for queries against state created from this StateDescriptor. */
-	@Nullable
-	private StateTtlConfiguration ttlConfig;
+	@Nonnull
+	private StateTtlConfiguration ttlConfig = StateTtlConfiguration.DISABLED;
 
 	/** The default value returned by the state when no other value is bound to a key. */
 	@Nullable
@@ -208,7 +209,8 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
 	 * @throws IllegalStateException If queryable state name already set
 	 */
 	public void setQueryable(String queryableStateName) {
-		Preconditions.checkArgument(ttlConfig == null,
+		Preconditions.checkArgument(
+			ttlConfig.getTtlUpdateType() == StateTtlConfiguration.TtlUpdateType.Disabled,
 			"Queryable state is currently not supported with TTL");
 		if (this.queryableStateName == null) {
 			this.queryableStateName = Preconditions.checkNotNull(queryableStateName, "Registration name");
@@ -247,12 +249,14 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
 	 */
 	public void enableTimeToLive(StateTtlConfiguration ttlConfig) {
 		Preconditions.checkNotNull(ttlConfig);
-		Preconditions.checkArgument(queryableStateName == null,
+		Preconditions.checkArgument(
+			ttlConfig.getTtlUpdateType() != StateTtlConfiguration.TtlUpdateType.Disabled &&
+				queryableStateName == null,
 			"Queryable state is currently not supported with TTL");
 		this.ttlConfig = ttlConfig;
 	}
 
-	@Nullable
+	@Nonnull
 	@Internal
 	public StateTtlConfiguration getTtlConfig() {
 		return ttlConfig;

http://git-wip-us.apache.org/repos/asf/flink/blob/1632681e/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfiguration.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfiguration.java
index 9bd8b15..55ec29c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfiguration.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfiguration.java
@@ -21,6 +21,8 @@ package org.apache.flink.api.common.state;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.util.Preconditions;
 
+import java.io.Serializable;
+
 import static org.apache.flink.api.common.state.StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired;
 import static org.apache.flink.api.common.state.StateTtlConfiguration.TtlTimeCharacteristic.ProcessingTime;
 import static org.apache.flink.api.common.state.StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite;
@@ -28,11 +30,19 @@ import static org.apache.flink.api.common.state.StateTtlConfiguration.TtlUpdateT
 /**
  * Configuration of state TTL logic.
  */
-public class StateTtlConfiguration {
+public class StateTtlConfiguration implements Serializable {
+
+	private static final long serialVersionUID = -7592693245044289793L;
+
+	public static final StateTtlConfiguration DISABLED =
+		newBuilder(Time.milliseconds(Long.MAX_VALUE)).setTtlUpdateType(TtlUpdateType.Disabled).build();
+
 	/**
 	 * This option value configures when to update last access timestamp which prolongs state TTL.
 	 */
 	public enum TtlUpdateType {
+		/** TTL is disabled. State does not expire. */
+		Disabled,
 		/** Last access timestamp is initialised when state is created and updated on every write operation. */
 		OnCreateAndWrite,
 		/** The same as <code>OnCreateAndWrite</code> but also updated on read. */
@@ -91,6 +101,10 @@ public class StateTtlConfiguration {
 		return timeCharacteristic;
 	}
 
+	public boolean isEnabled() {
+		return ttlUpdateType != TtlUpdateType.Disabled;
+	}
+
 	@Override
 	public String toString() {
 		return "StateTtlConfiguration{" +
@@ -129,6 +143,14 @@ public class StateTtlConfiguration {
 			return this;
 		}
 
+		public Builder updateTtlOnCreateAndWrite() {
+			return setTtlUpdateType(TtlUpdateType.OnCreateAndWrite);
+		}
+
+		public Builder updateTtlOnReadAndWrite() {
+			return setTtlUpdateType(TtlUpdateType.OnReadAndWrite);
+		}
+
 		/**
 		 * Sets the state visibility.
 		 *
@@ -139,6 +161,14 @@ public class StateTtlConfiguration {
 			return this;
 		}
 
+		public Builder returnExpiredIfNotCleanedUp() {
+			return setStateVisibility(TtlStateVisibility.ReturnExpiredIfNotCleanedUp);
+		}
+
+		public Builder neverReturnExpired() {
+			return setStateVisibility(TtlStateVisibility.NeverReturnExpired);
+		}
+
 		/**
 		 * Sets the time characteristic.
 		 *
@@ -149,6 +179,10 @@ public class StateTtlConfiguration {
 			return this;
 		}
 
+		public Builder useProcessingTime() {
+			return setTimeCharacteristic(TtlTimeCharacteristic.ProcessingTime);
+		}
+
 		/**
 		 * Sets the ttl time.
 		 * @param ttl The ttl time.

http://git-wip-us.apache.org/repos/asf/flink/blob/1632681e/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
index 5909ac7..e12ba90 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
@@ -54,10 +54,10 @@ public class TtlStateFactory {
 		Preconditions.checkNotNull(stateDesc);
 		Preconditions.checkNotNull(originalStateFactory);
 		Preconditions.checkNotNull(timeProvider);
-		return stateDesc.getTtlConfig() == null ?
-			originalStateFactory.createInternalState(namespaceSerializer, stateDesc) :
+		return  stateDesc.getTtlConfig().isEnabled() ?
 			new TtlStateFactory(originalStateFactory, stateDesc.getTtlConfig(), timeProvider)
-				.createState(namespaceSerializer, stateDesc);
+				.createState(namespaceSerializer, stateDesc) :
+			originalStateFactory.createInternalState(namespaceSerializer, stateDesc);
 	}
 
 	private final Map<Class<? extends StateDescriptor>, KeyedStateFactory> stateFactories;