You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/14 05:52:32 UTC
[12/12] flink git commit: [FLINK-9701] [state] (follow up) Use
StateTtlConfiguration.DISABLED instead of null,
make it Serializable and add convenience methods to its builder
[FLINK-9701] [state] (follow up) Use StateTtlConfiguration.DISABLED instead of null, make it Serializable and add
convenience methods to its builder
This closes #6331.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1632681e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1632681e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1632681e
Branch: refs/heads/master
Commit: 1632681e41cbc1092a6b4d47a58adfffba6af5d4
Parents: 57872d5
Author: Andrey Zagrebin <az...@gmail.com>
Authored: Thu Jul 12 17:12:18 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Jul 13 18:32:57 2018 +0200
----------------------------------------------------------------------
.../flink/api/common/state/StateDescriptor.java | 14 +++++---
.../api/common/state/StateTtlConfiguration.java | 36 +++++++++++++++++++-
.../runtime/state/ttl/TtlStateFactory.java | 6 ++--
3 files changed, 47 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1632681e/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
index 956fd05..191eb6f 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
@@ -28,6 +28,7 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.util.Preconditions;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.ByteArrayInputStream;
@@ -94,8 +95,8 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
private String queryableStateName;
/** Name for queries against state created from this StateDescriptor. */
- @Nullable
- private StateTtlConfiguration ttlConfig;
+ @Nonnull
+ private StateTtlConfiguration ttlConfig = StateTtlConfiguration.DISABLED;
/** The default value returned by the state when no other value is bound to a key. */
@Nullable
@@ -208,7 +209,8 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
* @throws IllegalStateException If queryable state name already set
*/
public void setQueryable(String queryableStateName) {
- Preconditions.checkArgument(ttlConfig == null,
+ Preconditions.checkArgument(
+ ttlConfig.getTtlUpdateType() == StateTtlConfiguration.TtlUpdateType.Disabled,
"Queryable state is currently not supported with TTL");
if (this.queryableStateName == null) {
this.queryableStateName = Preconditions.checkNotNull(queryableStateName, "Registration name");
@@ -247,12 +249,14 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
*/
public void enableTimeToLive(StateTtlConfiguration ttlConfig) {
Preconditions.checkNotNull(ttlConfig);
- Preconditions.checkArgument(queryableStateName == null,
+ Preconditions.checkArgument(
+ ttlConfig.getTtlUpdateType() != StateTtlConfiguration.TtlUpdateType.Disabled &&
+ queryableStateName == null,
"Queryable state is currently not supported with TTL");
this.ttlConfig = ttlConfig;
}
- @Nullable
+ @Nonnull
@Internal
public StateTtlConfiguration getTtlConfig() {
return ttlConfig;
http://git-wip-us.apache.org/repos/asf/flink/blob/1632681e/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfiguration.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfiguration.java
index 9bd8b15..55ec29c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfiguration.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfiguration.java
@@ -21,6 +21,8 @@ package org.apache.flink.api.common.state;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.util.Preconditions;
+import java.io.Serializable;
+
import static org.apache.flink.api.common.state.StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired;
import static org.apache.flink.api.common.state.StateTtlConfiguration.TtlTimeCharacteristic.ProcessingTime;
import static org.apache.flink.api.common.state.StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite;
@@ -28,11 +30,19 @@ import static org.apache.flink.api.common.state.StateTtlConfiguration.TtlUpdateT
/**
* Configuration of state TTL logic.
*/
-public class StateTtlConfiguration {
+public class StateTtlConfiguration implements Serializable {
+
+ private static final long serialVersionUID = -7592693245044289793L;
+
+ public static final StateTtlConfiguration DISABLED =
+ newBuilder(Time.milliseconds(Long.MAX_VALUE)).setTtlUpdateType(TtlUpdateType.Disabled).build();
+
/**
* This option value configures when to update last access timestamp which prolongs state TTL.
*/
public enum TtlUpdateType {
+ /** TTL is disabled. State does not expire. */
+ Disabled,
/** Last access timestamp is initialised when state is created and updated on every write operation. */
OnCreateAndWrite,
/** The same as <code>OnCreateAndWrite</code> but also updated on read. */
@@ -91,6 +101,10 @@ public class StateTtlConfiguration {
return timeCharacteristic;
}
+ public boolean isEnabled() {
+ return ttlUpdateType != TtlUpdateType.Disabled;
+ }
+
@Override
public String toString() {
return "StateTtlConfiguration{" +
@@ -129,6 +143,14 @@ public class StateTtlConfiguration {
return this;
}
+ public Builder updateTtlOnCreateAndWrite() {
+ return setTtlUpdateType(TtlUpdateType.OnCreateAndWrite);
+ }
+
+ public Builder updateTtlOnReadAndWrite() {
+ return setTtlUpdateType(TtlUpdateType.OnReadAndWrite);
+ }
+
/**
* Sets the state visibility.
*
@@ -139,6 +161,14 @@ public class StateTtlConfiguration {
return this;
}
+ public Builder returnExpiredIfNotCleanedUp() {
+ return setStateVisibility(TtlStateVisibility.ReturnExpiredIfNotCleanedUp);
+ }
+
+ public Builder neverReturnExpired() {
+ return setStateVisibility(TtlStateVisibility.NeverReturnExpired);
+ }
+
/**
* Sets the time characteristic.
*
@@ -149,6 +179,10 @@ public class StateTtlConfiguration {
return this;
}
+ public Builder useProcessingTime() {
+ return setTimeCharacteristic(TtlTimeCharacteristic.ProcessingTime);
+ }
+
/**
* Sets the ttl time.
* @param ttl The ttl time.
http://git-wip-us.apache.org/repos/asf/flink/blob/1632681e/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
index 5909ac7..e12ba90 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
@@ -54,10 +54,10 @@ public class TtlStateFactory {
Preconditions.checkNotNull(stateDesc);
Preconditions.checkNotNull(originalStateFactory);
Preconditions.checkNotNull(timeProvider);
- return stateDesc.getTtlConfig() == null ?
- originalStateFactory.createInternalState(namespaceSerializer, stateDesc) :
+ return stateDesc.getTtlConfig().isEnabled() ?
new TtlStateFactory(originalStateFactory, stateDesc.getTtlConfig(), timeProvider)
- .createState(namespaceSerializer, stateDesc);
+ .createState(namespaceSerializer, stateDesc) :
+ originalStateFactory.createInternalState(namespaceSerializer, stateDesc);
}
private final Map<Class<? extends StateDescriptor>, KeyedStateFactory> stateFactories;