You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/07/11 17:17:06 UTC
flink git commit: [FLINK-9511] Implement state TTL configuration
Repository: flink
Updated Branches:
refs/heads/master 5c43d2b8a -> dc7d81c9c
[FLINK-9511] Implement state TTL configuration
This closes #6277.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dc7d81c9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dc7d81c9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dc7d81c9
Branch: refs/heads/master
Commit: dc7d81c9c0fdf33b97cdc467e5af064b2097c9de
Parents: 5c43d2b
Author: yanghua <ya...@gmail.com>
Authored: Sat Jul 7 10:53:42 2018 +0800
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Wed Jul 11 19:16:27 2018 +0200
----------------------------------------------------------------------
.../api/common/state/StateTtlConfiguration.java | 174 +++++++++++++++++++
.../runtime/state/ttl/AbstractTtlDecorator.java | 11 +-
.../runtime/state/ttl/AbstractTtlState.java | 3 +-
.../runtime/state/ttl/TtlAggregateFunction.java | 3 +-
.../runtime/state/ttl/TtlAggregatingState.java | 3 +-
.../flink/runtime/state/ttl/TtlConfig.java | 96 ----------
.../runtime/state/ttl/TtlFoldFunction.java | 3 +-
.../runtime/state/ttl/TtlFoldingState.java | 3 +-
.../flink/runtime/state/ttl/TtlListState.java | 3 +-
.../flink/runtime/state/ttl/TtlMapState.java | 3 +-
.../runtime/state/ttl/TtlReduceFunction.java | 3 +-
.../runtime/state/ttl/TtlReducingState.java | 3 +-
.../runtime/state/ttl/TtlStateFactory.java | 9 +-
.../flink/runtime/state/ttl/TtlValueState.java | 3 +-
.../runtime/state/ttl/TtlStateTestBase.java | 30 ++--
15 files changed, 221 insertions(+), 129 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/dc7d81c9/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfiguration.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfiguration.java
new file mode 100644
index 0000000..8ef2046
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfiguration.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.util.Preconditions;
+
+import static org.apache.flink.api.common.state.StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired;
+import static org.apache.flink.api.common.state.StateTtlConfiguration.TtlTimeCharacteristic.ProcessingTime;
+import static org.apache.flink.api.common.state.StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite;
+
+/**
+ * Configuration of state TTL logic.
+ * TODO: builder
+ */
+public class StateTtlConfiguration {
+ /**
+ * This option value configures when to update last access timestamp which prolongs state TTL.
+ */
+ public enum TtlUpdateType {
+ /** TTL is disabled. State does not expire. */
+ Disabled,
+ /** Last access timestamp is initialised when state is created and updated on every write operation. */
+ OnCreateAndWrite,
+ /** The same as <code>OnCreateAndWrite</code> but also updated on read. */
+ OnReadAndWrite
+ }
+
+ /**
+ * This option configures whether expired user value can be returned or not.
+ */
+ public enum TtlStateVisibility {
+ /** Return expired user value if it is not cleaned up yet. */
+ ReturnExpiredIfNotCleanedUp,
+ /** Never return expired user value. */
+ NeverReturnExpired
+ }
+
+ /**
+ * This option configures time scale to use for ttl.
+ */
+ public enum TtlTimeCharacteristic {
+ /** Processing time, see also <code>TimeCharacteristic.ProcessingTime</code>. */
+ ProcessingTime
+ }
+
+ private final TtlUpdateType ttlUpdateType;
+ private final TtlStateVisibility stateVisibility;
+ private final TtlTimeCharacteristic timeCharacteristic;
+ private final Time ttl;
+
+ private StateTtlConfiguration(
+ TtlUpdateType ttlUpdateType,
+ TtlStateVisibility stateVisibility,
+ TtlTimeCharacteristic timeCharacteristic,
+ Time ttl) {
+ this.ttlUpdateType = Preconditions.checkNotNull(ttlUpdateType);
+ this.stateVisibility = Preconditions.checkNotNull(stateVisibility);
+ this.timeCharacteristic = Preconditions.checkNotNull(timeCharacteristic);
+ this.ttl = Preconditions.checkNotNull(ttl);
+ Preconditions.checkArgument(ttl.toMilliseconds() > 0,
+ "TTL is expected to be positive");
+ }
+
+ public TtlUpdateType getTtlUpdateType() {
+ return ttlUpdateType;
+ }
+
+ public TtlStateVisibility getStateVisibility() {
+ return stateVisibility;
+ }
+
+ public Time getTtl() {
+ return ttl;
+ }
+
+ public TtlTimeCharacteristic getTimeCharacteristic() {
+ return timeCharacteristic;
+ }
+
+ @Override
+ public String toString() {
+ return "StateTtlConfiguration{" +
+ "ttlUpdateType=" + ttlUpdateType +
+ ", stateVisibility=" + stateVisibility +
+ ", timeCharacteristic=" + timeCharacteristic +
+ ", ttl=" + ttl +
+ '}';
+ }
+
+ public static Builder newBuilder(Time ttl) {
+ return new Builder(ttl);
+ }
+
+ /**
+ * Builder for the {@link StateTtlConfiguration}.
+ */
+ public static class Builder {
+
+ private TtlUpdateType ttlUpdateType = OnCreateAndWrite;
+ private TtlStateVisibility stateVisibility = NeverReturnExpired;
+ private TtlTimeCharacteristic timeCharacteristic = ProcessingTime;
+ private Time ttl;
+
+ public Builder(Time ttl) {
+ this.ttl = ttl;
+ }
+
+ /**
+ * Sets the ttl update type.
+ *
+ * @param ttlUpdateType The ttl update type configures when to update last access timestamp which prolongs state TTL.
+ */
+ public Builder setTtlUpdateType(TtlUpdateType ttlUpdateType) {
+ this.ttlUpdateType = ttlUpdateType;
+ return this;
+ }
+
+ /**
+ * Sets the state visibility.
+ *
+ * @param stateVisibility The state visibility configures whether expired user value can be returned or not.
+ */
+ public Builder setStateVisibility(TtlStateVisibility stateVisibility) {
+ this.stateVisibility = stateVisibility;
+ return this;
+ }
+
+ /**
+ * Sets the time characteristic.
+ *
+ * @param timeCharacteristic The time characteristic configures time scale to use for ttl.
+ */
+ public Builder setTimeCharacteristic(TtlTimeCharacteristic timeCharacteristic) {
+ this.timeCharacteristic = timeCharacteristic;
+ return this;
+ }
+
+ /**
+ * Sets the ttl time.
+ * @param ttl The ttl time.
+ */
+ public Builder setTtl(Time ttl) {
+ this.ttl = ttl;
+ return this;
+ }
+
+ public StateTtlConfiguration build() {
+ return new StateTtlConfiguration(
+ ttlUpdateType,
+ stateVisibility,
+ timeCharacteristic,
+ ttl
+ );
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dc7d81c9/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
index 12efc00..29a575a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.state.ttl;
+import org.apache.flink.api.common.state.StateTtlConfiguration;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SupplierWithException;
import org.apache.flink.util.function.ThrowingConsumer;
@@ -34,7 +35,7 @@ abstract class AbstractTtlDecorator<T> {
/** Wrapped original state handler. */
final T original;
- final TtlConfig config;
+ final StateTtlConfiguration config;
final TtlTimeProvider timeProvider;
@@ -49,18 +50,18 @@ abstract class AbstractTtlDecorator<T> {
AbstractTtlDecorator(
T original,
- TtlConfig config,
+ StateTtlConfiguration config,
TtlTimeProvider timeProvider) {
Preconditions.checkNotNull(original);
Preconditions.checkNotNull(config);
Preconditions.checkNotNull(timeProvider);
- Preconditions.checkArgument(config.getTtlUpdateType() != TtlConfig.TtlUpdateType.Disabled,
+ Preconditions.checkArgument(config.getTtlUpdateType() != StateTtlConfiguration.TtlUpdateType.Disabled,
"State does not need to be wrapped with TTL if it is configured as disabled.");
this.original = original;
this.config = config;
this.timeProvider = timeProvider;
- this.updateTsOnRead = config.getTtlUpdateType() == TtlConfig.TtlUpdateType.OnReadAndWrite;
- this.returnExpired = config.getStateVisibility() == TtlConfig.TtlStateVisibility.ReturnExpiredIfNotCleanedUp;
+ this.updateTsOnRead = config.getTtlUpdateType() == StateTtlConfiguration.TtlUpdateType.OnReadAndWrite;
+ this.returnExpired = config.getStateVisibility() == StateTtlConfiguration.TtlStateVisibility.ReturnExpiredIfNotCleanedUp;
this.ttl = config.getTtl().toMilliseconds();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dc7d81c9/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlState.java
index ddab8f4..7903796 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlState.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.state.ttl;
+import org.apache.flink.api.common.state.StateTtlConfiguration;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.util.FlinkRuntimeException;
@@ -38,7 +39,7 @@ abstract class AbstractTtlState<K, N, SV, TTLSV, S extends InternalKvState<K, N,
implements InternalKvState<K, N, SV> {
private final TypeSerializer<SV> valueSerializer;
- AbstractTtlState(S original, TtlConfig config, TtlTimeProvider timeProvider, TypeSerializer<SV> valueSerializer) {
+ AbstractTtlState(S original, StateTtlConfiguration config, TtlTimeProvider timeProvider, TypeSerializer<SV> valueSerializer) {
super(original, config, timeProvider);
this.valueSerializer = valueSerializer;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dc7d81c9/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAggregateFunction.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAggregateFunction.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAggregateFunction.java
index 7d35ef3..5448ba1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAggregateFunction.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAggregateFunction.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.state.ttl;
import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.state.StateTtlConfiguration;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingConsumer;
@@ -37,7 +38,7 @@ class TtlAggregateFunction<IN, ACC, OUT>
ThrowingRunnable<Exception> stateClear;
ThrowingConsumer<TtlValue<ACC>, Exception> updater;
- TtlAggregateFunction(AggregateFunction<IN, ACC, OUT> aggFunction, TtlConfig config, TtlTimeProvider timeProvider) {
+ TtlAggregateFunction(AggregateFunction<IN, ACC, OUT> aggFunction, StateTtlConfiguration config, TtlTimeProvider timeProvider) {
super(aggFunction, config, timeProvider);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dc7d81c9/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAggregatingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAggregatingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAggregatingState.java
index 097e49b..a90698e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAggregatingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAggregatingState.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.state.ttl;
+import org.apache.flink.api.common.state.StateTtlConfiguration;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.internal.InternalAggregatingState;
@@ -39,7 +40,7 @@ class TtlAggregatingState<K, N, IN, ACC, OUT>
TtlAggregatingState(
InternalAggregatingState<K, N, IN, TtlValue<ACC>, OUT> originalState,
- TtlConfig config,
+ StateTtlConfiguration config,
TtlTimeProvider timeProvider,
TypeSerializer<ACC> valueSerializer,
TtlAggregateFunction<IN, ACC, OUT> aggregateFunction) {
http://git-wip-us.apache.org/repos/asf/flink/blob/dc7d81c9/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlConfig.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlConfig.java
deleted file mode 100644
index 17cac49..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlConfig.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.ttl;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.util.Preconditions;
-
-/**
- * Configuration of state TTL logic.
- * TODO: builder
- */
-public class TtlConfig {
- /**
- * This option value configures when to update last access timestamp which prolongs state TTL.
- */
- public enum TtlUpdateType {
- /** TTL is disabled. State does not expire. */
- Disabled,
- /** Last access timestamp is initialised when state is created and updated on every write operation. */
- OnCreateAndWrite,
- /** The same as <code>OnCreateAndWrite</code> but also updated on read. */
- OnReadAndWrite
- }
-
- /**
- * This option configures whether expired user value can be returned or not.
- */
- public enum TtlStateVisibility {
- /** Return expired user value if it is not cleaned up yet. */
- ReturnExpiredIfNotCleanedUp,
- /** Never return expired user value. */
- NeverReturnExpired
- }
-
- /**
- * This option configures time scale to use for ttl.
- */
- public enum TtlTimeCharacteristic {
- /** Processing time, see also <code>TimeCharacteristic.ProcessingTime</code>. */
- ProcessingTime
- }
-
- private final TtlUpdateType ttlUpdateType;
- private final TtlStateVisibility stateVisibility;
- private final TtlTimeCharacteristic timeCharacteristic;
- private final Time ttl;
-
- public TtlConfig(
- TtlUpdateType ttlUpdateType,
- TtlStateVisibility stateVisibility,
- TtlTimeCharacteristic timeCharacteristic,
- Time ttl) {
- Preconditions.checkNotNull(ttlUpdateType);
- Preconditions.checkNotNull(stateVisibility);
- Preconditions.checkNotNull(timeCharacteristic);
- Preconditions.checkNotNull(ttl);
- Preconditions.checkArgument(ttl.toMilliseconds() > 0,
- "TTL is expected to be positive");
- this.ttlUpdateType = ttlUpdateType;
- this.stateVisibility = stateVisibility;
- this.timeCharacteristic = timeCharacteristic;
- this.ttl = ttl;
- }
-
- public TtlUpdateType getTtlUpdateType() {
- return ttlUpdateType;
- }
-
- public TtlStateVisibility getStateVisibility() {
- return stateVisibility;
- }
-
- public Time getTtl() {
- return ttl;
- }
-
- public TtlTimeCharacteristic getTimeCharacteristic() {
- return timeCharacteristic;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/dc7d81c9/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldFunction.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldFunction.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldFunction.java
index 8d54d87..c7305bd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldFunction.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldFunction.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.state.ttl;
import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.state.StateTtlConfiguration;
/**
* This class wraps folding function with TTL logic.
@@ -35,7 +36,7 @@ class TtlFoldFunction<T, ACC>
private final ACC defaultAccumulator;
TtlFoldFunction(
- FoldFunction<T, ACC> original, TtlConfig config, TtlTimeProvider timeProvider, ACC defaultAccumulator) {
+ FoldFunction<T, ACC> original, StateTtlConfiguration config, TtlTimeProvider timeProvider, ACC defaultAccumulator) {
super(original, config, timeProvider);
this.defaultAccumulator = defaultAccumulator;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dc7d81c9/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldingState.java
index 46f56d2..c3a75e4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldingState.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.state.ttl;
import org.apache.flink.api.common.state.AggregatingState;
+import org.apache.flink.api.common.state.StateTtlConfiguration;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.internal.InternalFoldingState;
@@ -36,7 +37,7 @@ class TtlFoldingState<K, N, T, ACC>
implements InternalFoldingState<K, N, T, ACC> {
TtlFoldingState(
InternalFoldingState<K, N, T, TtlValue<ACC>> originalState,
- TtlConfig config,
+ StateTtlConfiguration config,
TtlTimeProvider timeProvider,
TypeSerializer<ACC> valueSerializer) {
super(originalState, config, timeProvider, valueSerializer);
http://git-wip-us.apache.org/repos/asf/flink/blob/dc7d81c9/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
index 6da0d8f..77e92f6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.state.ttl;
+import org.apache.flink.api.common.state.StateTtlConfiguration;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.util.Preconditions;
@@ -42,7 +43,7 @@ class TtlListState<K, N, T> extends
implements InternalListState<K, N, T> {
TtlListState(
InternalListState<K, N, TtlValue<T>> originalState,
- TtlConfig config,
+ StateTtlConfiguration config,
TtlTimeProvider timeProvider,
TypeSerializer<List<T>> valueSerializer) {
super(originalState, config, timeProvider, valueSerializer);
http://git-wip-us.apache.org/repos/asf/flink/blob/dc7d81c9/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
index 352c6c9..98d7c52 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.state.ttl;
+import org.apache.flink.api.common.state.StateTtlConfiguration;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.internal.InternalMapState;
import org.apache.flink.util.FlinkRuntimeException;
@@ -43,7 +44,7 @@ class TtlMapState<K, N, UK, UV>
implements InternalMapState<K, N, UK, UV> {
TtlMapState(
InternalMapState<K, N, UK, TtlValue<UV>> original,
- TtlConfig config,
+ StateTtlConfiguration config,
TtlTimeProvider timeProvider,
TypeSerializer<Map<UK, UV>> valueSerializer) {
super(original, config, timeProvider, valueSerializer);
http://git-wip-us.apache.org/repos/asf/flink/blob/dc7d81c9/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReduceFunction.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReduceFunction.java
index de593c5..fa7c8bf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReduceFunction.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReduceFunction.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.state.ttl;
import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.StateTtlConfiguration;
/**
* This class wraps reducing function with TTL logic.
@@ -31,7 +32,7 @@ class TtlReduceFunction<T>
TtlReduceFunction(
ReduceFunction<T> originalReduceFunction,
- TtlConfig config,
+ StateTtlConfiguration config,
TtlTimeProvider timeProvider) {
super(originalReduceFunction, config, timeProvider);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dc7d81c9/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReducingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReducingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReducingState.java
index 9169d45..01e4be9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReducingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReducingState.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.state.ttl;
+import org.apache.flink.api.common.state.StateTtlConfiguration;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.internal.InternalReducingState;
@@ -35,7 +36,7 @@ class TtlReducingState<K, N, T>
implements InternalReducingState<K, N, T> {
TtlReducingState(
InternalReducingState<K, N, TtlValue<T>> originalState,
- TtlConfig config,
+ StateTtlConfiguration config,
TtlTimeProvider timeProvider,
TypeSerializer<T> valueSerializer) {
super(originalState, config, timeProvider, valueSerializer);
http://git-wip-us.apache.org/repos/asf/flink/blob/dc7d81c9/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
index 2f7593a..82096a6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.StateTtlConfiguration;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.CompositeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -48,14 +49,14 @@ public class TtlStateFactory {
TypeSerializer<N> namespaceSerializer,
StateDescriptor<S, SV> stateDesc,
KeyedStateFactory originalStateFactory,
- TtlConfig ttlConfig,
+ StateTtlConfiguration ttlConfig,
TtlTimeProvider timeProvider) throws Exception {
Preconditions.checkNotNull(namespaceSerializer);
Preconditions.checkNotNull(stateDesc);
Preconditions.checkNotNull(originalStateFactory);
Preconditions.checkNotNull(ttlConfig);
Preconditions.checkNotNull(timeProvider);
- return ttlConfig.getTtlUpdateType() == TtlConfig.TtlUpdateType.Disabled ?
+ return ttlConfig.getTtlUpdateType() == StateTtlConfiguration.TtlUpdateType.Disabled ?
originalStateFactory.createState(namespaceSerializer, stateDesc) :
new TtlStateFactory(originalStateFactory, ttlConfig, timeProvider)
.createState(namespaceSerializer, stateDesc);
@@ -64,10 +65,10 @@ public class TtlStateFactory {
private final Map<Class<? extends StateDescriptor>, KeyedStateFactory> stateFactories;
private final KeyedStateFactory originalStateFactory;
- private final TtlConfig ttlConfig;
+ private final StateTtlConfiguration ttlConfig;
private final TtlTimeProvider timeProvider;
- private TtlStateFactory(KeyedStateFactory originalStateFactory, TtlConfig ttlConfig, TtlTimeProvider timeProvider) {
+ private TtlStateFactory(KeyedStateFactory originalStateFactory, StateTtlConfiguration ttlConfig, TtlTimeProvider timeProvider) {
this.originalStateFactory = originalStateFactory;
this.ttlConfig = ttlConfig;
this.timeProvider = timeProvider;
http://git-wip-us.apache.org/repos/asf/flink/blob/dc7d81c9/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValueState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValueState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValueState.java
index 9feb62d..c14a583 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValueState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValueState.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.state.ttl;
+import org.apache.flink.api.common.state.StateTtlConfiguration;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.internal.InternalValueState;
@@ -35,7 +36,7 @@ class TtlValueState<K, N, T>
implements InternalValueState<K, N, T> {
TtlValueState(
InternalValueState<K, N, TtlValue<T>> originalState,
- TtlConfig config,
+ StateTtlConfiguration config,
TtlTimeProvider timeProvider,
TypeSerializer<T> valueSerializer) {
super(originalState, config, timeProvider, valueSerializer);
http://git-wip-us.apache.org/repos/asf/flink/blob/dc7d81c9/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
index 13eac32..bc3d6e7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.state.ttl;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.StateTtlConfiguration;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.runtime.state.KeyedStateFactory;
@@ -39,7 +40,7 @@ abstract class TtlStateTestBase<S extends InternalKvState<?, String, ?>, UV, GV>
S ttlState;
MockTimeProvider timeProvider;
- TtlConfig ttlConfig;
+ StateTtlConfiguration ttlConfig;
ThrowingConsumer<UV, Exception> updater;
SupplierWithException<GV, Exception> getter;
@@ -56,20 +57,21 @@ abstract class TtlStateTestBase<S extends InternalKvState<?, String, ?>, UV, GV>
GV emptyValue = null;
void initTest() {
- initTest(TtlConfig.TtlUpdateType.OnCreateAndWrite, TtlConfig.TtlStateVisibility.NeverReturnExpired);
+ initTest(StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite, StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired);
}
- private void initTest(TtlConfig.TtlUpdateType updateType, TtlConfig.TtlStateVisibility visibility) {
+ private void initTest(StateTtlConfiguration.TtlUpdateType updateType, StateTtlConfiguration.TtlStateVisibility visibility) {
initTest(updateType, visibility, TTL);
}
- private void initTest(TtlConfig.TtlUpdateType updateType, TtlConfig.TtlStateVisibility visibility, long ttl) {
+ private void initTest(StateTtlConfiguration.TtlUpdateType updateType, StateTtlConfiguration.TtlStateVisibility visibility, long ttl) {
timeProvider = new MockTimeProvider();
- ttlConfig = new TtlConfig(
- updateType,
- visibility,
- TtlConfig.TtlTimeCharacteristic.ProcessingTime,
- Time.milliseconds(ttl));
+ StateTtlConfiguration.Builder ttlConfigBuilder = StateTtlConfiguration.newBuilder(Time.seconds(5));
+ ttlConfigBuilder.setTtlUpdateType(updateType)
+ .setStateVisibility(visibility)
+ .setTimeCharacteristic(StateTtlConfiguration.TtlTimeCharacteristic.ProcessingTime)
+ .setTtl(Time.milliseconds(ttl));
+ ttlConfig = ttlConfigBuilder.build();
ttlState = createState();
initTestValues();
}
@@ -96,7 +98,7 @@ abstract class TtlStateTestBase<S extends InternalKvState<?, String, ?>, UV, GV>
@Test
public void testExactExpirationOnWrite() throws Exception {
- initTest(TtlConfig.TtlUpdateType.OnCreateAndWrite, TtlConfig.TtlStateVisibility.NeverReturnExpired);
+ initTest(StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite, StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired);
timeProvider.time = 0;
updater.accept(updateEmpty);
@@ -123,7 +125,7 @@ abstract class TtlStateTestBase<S extends InternalKvState<?, String, ?>, UV, GV>
@Test
public void testRelaxedExpirationOnWrite() throws Exception {
- initTest(TtlConfig.TtlUpdateType.OnCreateAndWrite, TtlConfig.TtlStateVisibility.ReturnExpiredIfNotCleanedUp);
+ initTest(StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite, StateTtlConfiguration.TtlStateVisibility.ReturnExpiredIfNotCleanedUp);
timeProvider.time = 0;
updater.accept(updateEmpty);
@@ -135,7 +137,7 @@ abstract class TtlStateTestBase<S extends InternalKvState<?, String, ?>, UV, GV>
@Test
public void testExactExpirationOnRead() throws Exception {
- initTest(TtlConfig.TtlUpdateType.OnReadAndWrite, TtlConfig.TtlStateVisibility.NeverReturnExpired);
+ initTest(StateTtlConfiguration.TtlUpdateType.OnReadAndWrite, StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired);
timeProvider.time = 0;
updater.accept(updateEmpty);
@@ -153,7 +155,7 @@ abstract class TtlStateTestBase<S extends InternalKvState<?, String, ?>, UV, GV>
@Test
public void testRelaxedExpirationOnRead() throws Exception {
- initTest(TtlConfig.TtlUpdateType.OnReadAndWrite, TtlConfig.TtlStateVisibility.ReturnExpiredIfNotCleanedUp);
+ initTest(StateTtlConfiguration.TtlUpdateType.OnReadAndWrite, StateTtlConfiguration.TtlStateVisibility.ReturnExpiredIfNotCleanedUp);
timeProvider.time = 0;
updater.accept(updateEmpty);
@@ -168,7 +170,7 @@ abstract class TtlStateTestBase<S extends InternalKvState<?, String, ?>, UV, GV>
@Test
public void testExpirationTimestampOverflow() throws Exception {
- initTest(TtlConfig.TtlUpdateType.OnCreateAndWrite, TtlConfig.TtlStateVisibility.NeverReturnExpired, Long.MAX_VALUE);
+ initTest(StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite, StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired, Long.MAX_VALUE);
timeProvider.time = 10;
updater.accept(updateEmpty);