You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/06/29 12:20:23 UTC
[2/2] flink git commit: [FLINK-9514, FLINK-9515,
FLINK-9516] Introduce wrapper to enhance state with ttl
[FLINK-9514,FLINK-9515,FLINK-9516] Introduce wrapper to enhance state with ttl
This closes #6186.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2c13e00c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2c13e00c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2c13e00c
Branch: refs/heads/master
Commit: 2c13e00c1c108163466f9944de2a32dd5a1b7352
Parents: 92a6a22
Author: Andrey Zagrebin <az...@gmail.com>
Authored: Mon Jun 4 17:28:40 2018 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Fri Jun 29 14:18:59 2018 +0200
----------------------------------------------------------------------
.../runtime/state/ttl/AbstractTtlDecorator.java | 110 ++++++++++++
.../runtime/state/ttl/AbstractTtlState.java | 85 +++++++++
.../runtime/state/ttl/TtlAggregateFunction.java | 83 +++++++++
.../runtime/state/ttl/TtlAggregatingState.java | 80 +++++++++
.../flink/runtime/state/ttl/TtlConfig.java | 96 +++++++++++
.../runtime/state/ttl/TtlFoldFunction.java | 49 ++++++
.../runtime/state/ttl/TtlFoldingState.java | 69 ++++++++
.../flink/runtime/state/ttl/TtlListState.java | 172 +++++++++++++++++++
.../flink/runtime/state/ttl/TtlMapState.java | 134 +++++++++++++++
.../runtime/state/ttl/TtlReduceFunction.java | 53 ++++++
.../runtime/state/ttl/TtlReducingState.java | 73 ++++++++
.../runtime/state/ttl/TtlTimeProvider.java | 26 +++
.../flink/runtime/state/ttl/TtlValue.java | 47 +++++
.../flink/runtime/state/ttl/TtlValueState.java | 53 ++++++
.../runtime/state/ttl/MockInternalKvState.java | 96 +++++++++++
.../runtime/state/ttl/MockInternalMapState.java | 88 ++++++++++
.../state/ttl/MockInternalMergingState.java | 52 ++++++
.../runtime/state/ttl/MockTimeProvider.java | 28 +++
.../state/ttl/TtlAggregatingStateTest.java | 115 +++++++++++++
.../runtime/state/ttl/TtlFoldingStateTest.java | 71 ++++++++
.../runtime/state/ttl/TtlListStateTest.java | 112 ++++++++++++
.../state/ttl/TtlMapStatePerElementTest.java | 47 +++++
.../runtime/state/ttl/TtlMapStateTest.java | 60 +++++++
.../runtime/state/ttl/TtlMergingStateBase.java | 126 ++++++++++++++
.../runtime/state/ttl/TtlReducingStateTest.java | 94 ++++++++++
.../runtime/state/ttl/TtlStateTestBase.java | 162 +++++++++++++++++
.../runtime/state/ttl/TtlValueStateTest.java | 62 +++++++
27 files changed, 2243 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2c13e00c/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
new file mode 100644
index 0000000..12efc00
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Base class for TTL logic wrappers.
+ *
+ * @param <T> Type of originally wrapped object
+ */
+abstract class AbstractTtlDecorator<T> {
+ /** Wrapped original state handler. */
+ final T original;
+
+ final TtlConfig config;
+
+ final TtlTimeProvider timeProvider;
+
+ /** Whether to renew expiration timestamp on state read access. */
+ final boolean updateTsOnRead;
+
+ /** Whether to renew expiration timestamp on state read access. */
+ final boolean returnExpired;
+
+ /** State value time to live in milliseconds. */
+ final long ttl;
+
+ AbstractTtlDecorator(
+ T original,
+ TtlConfig config,
+ TtlTimeProvider timeProvider) {
+ Preconditions.checkNotNull(original);
+ Preconditions.checkNotNull(config);
+ Preconditions.checkNotNull(timeProvider);
+ Preconditions.checkArgument(config.getTtlUpdateType() != TtlConfig.TtlUpdateType.Disabled,
+ "State does not need to be wrapped with TTL if it is configured as disabled.");
+ this.original = original;
+ this.config = config;
+ this.timeProvider = timeProvider;
+ this.updateTsOnRead = config.getTtlUpdateType() == TtlConfig.TtlUpdateType.OnReadAndWrite;
+ this.returnExpired = config.getStateVisibility() == TtlConfig.TtlStateVisibility.ReturnExpiredIfNotCleanedUp;
+ this.ttl = config.getTtl().toMilliseconds();
+ }
+
+ <V> V getUnexpired(TtlValue<V> ttlValue) {
+ return ttlValue == null || (expired(ttlValue) && !returnExpired) ? null : ttlValue.getUserValue();
+ }
+
+ <V> boolean expired(TtlValue<V> ttlValue) {
+ return ttlValue != null && getExpirationTimestamp(ttlValue) <= timeProvider.currentTimestamp();
+ }
+
+ private long getExpirationTimestamp(@Nonnull TtlValue<?> ttlValue) {
+ long ts = ttlValue.getLastAccessTimestamp();
+ long ttlWithoutOverflow = ts > 0 ? Math.min(Long.MAX_VALUE - ts, ttl) : ttl;
+ return ts + ttlWithoutOverflow;
+ }
+
+ <V> TtlValue<V> wrapWithTs(V value) {
+ return wrapWithTs(value, timeProvider.currentTimestamp());
+ }
+
+ static <V> TtlValue<V> wrapWithTs(V value, long ts) {
+ return value == null ? null : new TtlValue<>(value, ts);
+ }
+
+ <V> TtlValue<V> rewrapWithNewTs(TtlValue<V> ttlValue) {
+ return wrapWithTs(ttlValue.getUserValue());
+ }
+
+ <SE extends Throwable, CE extends Throwable, CLE extends Throwable, V> V getWithTtlCheckAndUpdate(
+ SupplierWithException<TtlValue<V>, SE> getter,
+ ThrowingConsumer<TtlValue<V>, CE> updater,
+ ThrowingRunnable<CLE> stateClear) throws SE, CE, CLE {
+ TtlValue<V> ttlValue = getter.get();
+ if (ttlValue == null) {
+ return null;
+ } else if (expired(ttlValue)) {
+ stateClear.run();
+ if (!returnExpired) {
+ return null;
+ }
+ } else if (updateTsOnRead) {
+ updater.accept(rewrapWithNewTs(ttlValue));
+ }
+ return ttlValue.getUserValue();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2c13e00c/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlState.java
new file mode 100644
index 0000000..ddab8f4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlState.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+/**
+ * Base class for TTL logic wrappers of state objects.
+ *
+ * @param <K> The type of key the state is associated to
+ * @param <N> The type of the namespace
+ * @param <SV> The type of values kept internally in state without TTL
+ * @param <TTLSV> The type of values kept internally in state with TTL
+ * @param <S> Type of originally wrapped state object
+ */
+abstract class AbstractTtlState<K, N, SV, TTLSV, S extends InternalKvState<K, N, TTLSV>>
+ extends AbstractTtlDecorator<S>
+ implements InternalKvState<K, N, SV> {
+ private final TypeSerializer<SV> valueSerializer;
+
+ AbstractTtlState(S original, TtlConfig config, TtlTimeProvider timeProvider, TypeSerializer<SV> valueSerializer) {
+ super(original, config, timeProvider);
+ this.valueSerializer = valueSerializer;
+ }
+
+ <SE extends Throwable, CE extends Throwable, T> T getWithTtlCheckAndUpdate(
+ SupplierWithException<TtlValue<T>, SE> getter,
+ ThrowingConsumer<TtlValue<T>, CE> updater) throws SE, CE {
+ return getWithTtlCheckAndUpdate(getter, updater, original::clear);
+ }
+
+ @Override
+ public TypeSerializer<K> getKeySerializer() {
+ return original.getKeySerializer();
+ }
+
+ @Override
+ public TypeSerializer<N> getNamespaceSerializer() {
+ return original.getNamespaceSerializer();
+ }
+
+ @Override
+ public TypeSerializer<SV> getValueSerializer() {
+ return valueSerializer;
+ }
+
+ @Override
+ public void setCurrentNamespace(N namespace) {
+ original.setCurrentNamespace(namespace);
+ }
+
+ @Override
+ public byte[] getSerializedValue(
+ byte[] serializedKeyAndNamespace,
+ TypeSerializer<K> safeKeySerializer,
+ TypeSerializer<N> safeNamespaceSerializer,
+ TypeSerializer<SV> safeValueSerializer) {
+ throw new FlinkRuntimeException("Queryable state is not currently supported with TTL.");
+ }
+
+ @Override
+ public void clear() {
+ original.clear();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2c13e00c/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAggregateFunction.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAggregateFunction.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAggregateFunction.java
new file mode 100644
index 0000000..7d35ef3
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAggregateFunction.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+/**
+ * This class wraps aggregating function with TTL logic.
+ *
+ * @param <IN> The type of the values that are aggregated (input values)
+ * @param <ACC> The type of the accumulator (intermediate aggregate state).
+ * @param <OUT> The type of the aggregated result
+ */
+class TtlAggregateFunction<IN, ACC, OUT>
+ extends AbstractTtlDecorator<AggregateFunction<IN, ACC, OUT>>
+ implements AggregateFunction<IN, TtlValue<ACC>, OUT> {
+ ThrowingRunnable<Exception> stateClear;
+ ThrowingConsumer<TtlValue<ACC>, Exception> updater;
+
+ TtlAggregateFunction(AggregateFunction<IN, ACC, OUT> aggFunction, TtlConfig config, TtlTimeProvider timeProvider) {
+ super(aggFunction, config, timeProvider);
+ }
+
+ @Override
+ public TtlValue<ACC> createAccumulator() {
+ return wrapWithTs(original.createAccumulator());
+ }
+
+ @Override
+ public TtlValue<ACC> add(IN value, TtlValue<ACC> accumulator) {
+ ACC userAcc = getUnexpired(accumulator);
+ userAcc = userAcc == null ? original.createAccumulator() : userAcc;
+ return wrapWithTs(original.add(value, userAcc));
+ }
+
+ @Override
+ public OUT getResult(TtlValue<ACC> accumulator) {
+ Preconditions.checkNotNull(updater, "State updater should be set in TtlAggregatingState");
+ Preconditions.checkNotNull(stateClear, "State clearing should be set in TtlAggregatingState");
+ ACC userAcc;
+ try {
+ userAcc = getWithTtlCheckAndUpdate(() -> accumulator, updater, stateClear);
+ } catch (Exception e) {
+ throw new FlinkRuntimeException("Failed to retrieve original internal aggregating state", e);
+ }
+ return userAcc == null ? null : original.getResult(userAcc);
+ }
+
+ @Override
+ public TtlValue<ACC> merge(TtlValue<ACC> a, TtlValue<ACC> b) {
+ ACC userA = getUnexpired(a);
+ ACC userB = getUnexpired(b);
+ if (userA != null && userB != null) {
+ return wrapWithTs(original.merge(userA, userB));
+ } else if (userA != null) {
+ return rewrapWithNewTs(a);
+ } else if (userB != null) {
+ return rewrapWithNewTs(b);
+ } else {
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2c13e00c/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAggregatingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAggregatingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAggregatingState.java
new file mode 100644
index 0000000..097e49b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAggregatingState.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalAggregatingState;
+
+import java.util.Collection;
+
+/**
+ * This class wraps aggregating state with TTL logic.
+ *
+ * @param <K> The type of key the state is associated to
+ * @param <N> The type of the namespace
+ * @param <IN> Type of the value added to the state
+ * @param <ACC> The type of the accumulator (intermediate aggregate state).
+ * @param <OUT> Type of the value extracted from the state
+ *
+ */
+class TtlAggregatingState<K, N, IN, ACC, OUT>
+ extends AbstractTtlState<K, N, ACC, TtlValue<ACC>, InternalAggregatingState<K, N, IN, TtlValue<ACC>, OUT>>
+ implements InternalAggregatingState<K, N, IN, ACC, OUT> {
+
+ TtlAggregatingState(
+ InternalAggregatingState<K, N, IN, TtlValue<ACC>, OUT> originalState,
+ TtlConfig config,
+ TtlTimeProvider timeProvider,
+ TypeSerializer<ACC> valueSerializer,
+ TtlAggregateFunction<IN, ACC, OUT> aggregateFunction) {
+ super(originalState, config, timeProvider, valueSerializer);
+ aggregateFunction.stateClear = originalState::clear;
+ aggregateFunction.updater = originalState::updateInternal;
+ }
+
+ @Override
+ public OUT get() throws Exception {
+ return original.get();
+ }
+
+ @Override
+ public void add(IN value) throws Exception {
+ original.add(value);
+ }
+
+ @Override
+ public void clear() {
+ original.clear();
+ }
+
+ @Override
+ public ACC getInternal() throws Exception {
+ return getWithTtlCheckAndUpdate(original::getInternal, original::updateInternal);
+ }
+
+ @Override
+ public void updateInternal(ACC valueToStore) throws Exception {
+ original.updateInternal(wrapWithTs(valueToStore));
+ }
+
+ @Override
+ public void mergeNamespaces(N target, Collection<N> sources) throws Exception {
+ original.mergeNamespaces(target, sources);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2c13e00c/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlConfig.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlConfig.java
new file mode 100644
index 0000000..17cac49
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlConfig.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Configuration of state TTL logic.
+ * TODO: builder
+ */
+public class TtlConfig {
+ /**
+ * This option value configures when to update last access timestamp which prolongs state TTL.
+ */
+ public enum TtlUpdateType {
+ /** TTL is disabled. State does not expire. */
+ Disabled,
+ /** Last access timestamp is initialised when state is created and updated on every write operation. */
+ OnCreateAndWrite,
+ /** The same as <code>OnCreateAndWrite</code> but also updated on read. */
+ OnReadAndWrite
+ }
+
+ /**
+ * This option configures whether expired user value can be returned or not.
+ */
+ public enum TtlStateVisibility {
+ /** Return expired user value if it is not cleaned up yet. */
+ ReturnExpiredIfNotCleanedUp,
+ /** Never return expired user value. */
+ NeverReturnExpired
+ }
+
+ /**
+ * This option configures time scale to use for ttl.
+ */
+ public enum TtlTimeCharacteristic {
+ /** Processing time, see also <code>TimeCharacteristic.ProcessingTime</code>. */
+ ProcessingTime
+ }
+
+ private final TtlUpdateType ttlUpdateType;
+ private final TtlStateVisibility stateVisibility;
+ private final TtlTimeCharacteristic timeCharacteristic;
+ private final Time ttl;
+
+ public TtlConfig(
+ TtlUpdateType ttlUpdateType,
+ TtlStateVisibility stateVisibility,
+ TtlTimeCharacteristic timeCharacteristic,
+ Time ttl) {
+ Preconditions.checkNotNull(ttlUpdateType);
+ Preconditions.checkNotNull(stateVisibility);
+ Preconditions.checkNotNull(timeCharacteristic);
+ Preconditions.checkNotNull(ttl);
+ Preconditions.checkArgument(ttl.toMilliseconds() > 0,
+ "TTL is expected to be positive");
+ this.ttlUpdateType = ttlUpdateType;
+ this.stateVisibility = stateVisibility;
+ this.timeCharacteristic = timeCharacteristic;
+ this.ttl = ttl;
+ }
+
+ public TtlUpdateType getTtlUpdateType() {
+ return ttlUpdateType;
+ }
+
+ public TtlStateVisibility getStateVisibility() {
+ return stateVisibility;
+ }
+
+ public Time getTtl() {
+ return ttl;
+ }
+
+ public TtlTimeCharacteristic getTimeCharacteristic() {
+ return timeCharacteristic;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2c13e00c/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldFunction.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldFunction.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldFunction.java
new file mode 100644
index 0000000..8d54d87
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldFunction.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+
+/**
+ * This class wraps folding function with TTL logic.
+ *
+ * @param <T> Type of the values folded into the state
+ * @param <ACC> Type of the value in the state
+ *
+ * @deprecated use {@link TtlAggregateFunction} instead
+ */
+@Deprecated
+class TtlFoldFunction<T, ACC>
+ extends AbstractTtlDecorator<FoldFunction<T, ACC>>
+ implements FoldFunction<T, TtlValue<ACC>> {
+ private final ACC defaultAccumulator;
+
+ TtlFoldFunction(
+ FoldFunction<T, ACC> original, TtlConfig config, TtlTimeProvider timeProvider, ACC defaultAccumulator) {
+ super(original, config, timeProvider);
+ this.defaultAccumulator = defaultAccumulator;
+ }
+
+ @Override
+ public TtlValue<ACC> fold(TtlValue<ACC> accumulator, T value) throws Exception {
+ ACC userAcc = getUnexpired(accumulator);
+ userAcc = userAcc == null ? defaultAccumulator : userAcc;
+ return wrapWithTs(original.fold(userAcc, value));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2c13e00c/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldingState.java
new file mode 100644
index 0000000..46f56d2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldingState.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.state.AggregatingState;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalFoldingState;
+
+/**
+ * This class wraps folding state with TTL logic.
+ *
+ * @param <T> Type of the values folded into the state
+ * @param <ACC> Type of the value in the state
+ *
+ * @deprecated use {@link AggregatingState} instead
+ */
+@Deprecated
+class TtlFoldingState<K, N, T, ACC>
+ extends AbstractTtlState<K, N, ACC, TtlValue<ACC>, InternalFoldingState<K, N, T, TtlValue<ACC>>>
+ implements InternalFoldingState<K, N, T, ACC> {
+ TtlFoldingState(
+ InternalFoldingState<K, N, T, TtlValue<ACC>> originalState,
+ TtlConfig config,
+ TtlTimeProvider timeProvider,
+ TypeSerializer<ACC> valueSerializer) {
+ super(originalState, config, timeProvider, valueSerializer);
+ }
+
+ @Override
+ public ACC get() throws Exception {
+ return getInternal();
+ }
+
+ @Override
+ public void add(T value) throws Exception {
+ original.add(value);
+ }
+
+ @Override
+ public void clear() {
+ original.clear();
+ }
+
+ @Override
+ public ACC getInternal() throws Exception {
+ return getWithTtlCheckAndUpdate(original::getInternal, original::updateInternal);
+ }
+
+ @Override
+ public void updateInternal(ACC valueToStore) throws Exception {
+ original.updateInternal(wrapWithTs(valueToStore));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2c13e00c/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
new file mode 100644
index 0000000..6da0d8f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param <K> The type of key the state is associated to
+ * @param <N> The type of the namespace
+ * @param <T> Type of the user entry value of state with TTL
+ */
+class TtlListState<K, N, T> extends
+ AbstractTtlState<K, N, List<T>, List<TtlValue<T>>, InternalListState<K, N, TtlValue<T>>>
+ implements InternalListState<K, N, T> {
+ TtlListState(
+ InternalListState<K, N, TtlValue<T>> originalState,
+ TtlConfig config,
+ TtlTimeProvider timeProvider,
+ TypeSerializer<List<T>> valueSerializer) {
+ super(originalState, config, timeProvider, valueSerializer);
+ }
+
+ @Override
+ public void update(List<T> values) throws Exception {
+ updateInternal(values);
+ }
+
+ @Override
+ public void addAll(List<T> values) throws Exception {
+ Preconditions.checkNotNull(values, "List of values to add cannot be null.");
+ original.addAll(withTs(values));
+ }
+
+ @Override
+ public Iterable<T> get() throws Exception {
+ Iterable<TtlValue<T>> ttlValue = original.get();
+ ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue;
+ if (updateTsOnRead) {
+ List<TtlValue<T>> collected = collect(ttlValue);
+ ttlValue = collected;
+ updateTs(collected);
+ }
+ final Iterable<TtlValue<T>> finalResult = ttlValue;
+ return () -> new IteratorWithCleanup(finalResult.iterator());
+ }
+
+ private void updateTs(List<TtlValue<T>> ttlValue) throws Exception {
+ List<TtlValue<T>> unexpiredWithUpdatedTs = ttlValue.stream()
+ .filter(v -> !expired(v))
+ .map(this::rewrapWithNewTs)
+ .collect(Collectors.toList());
+ if (!unexpiredWithUpdatedTs.isEmpty()) {
+ original.update(unexpiredWithUpdatedTs);
+ }
+ }
+
+ @Override
+ public void add(T value) throws Exception {
+ Preconditions.checkNotNull(value, "You cannot add null to a ListState.");
+ original.add(wrapWithTs(value));
+ }
+
+ @Override
+ public void clear() {
+ original.clear();
+ }
+
+ @Override
+ public void mergeNamespaces(N target, Collection<N> sources) throws Exception {
+ original.mergeNamespaces(target, sources);
+ }
+
+ @Override
+ public List<T> getInternal() throws Exception {
+ return collect(get());
+ }
+
+ private <E> List<E> collect(Iterable<E> iterable) {
+ return iterable instanceof List ? (List<E>) iterable :
+ StreamSupport.stream(iterable.spliterator(), false).collect(Collectors.toList());
+ }
+
+ @Override
+ public void updateInternal(List<T> valueToStore) throws Exception {
+ Preconditions.checkNotNull(valueToStore, "List of values to update cannot be null.");
+ original.updateInternal(withTs(valueToStore));
+ }
+
+ private List<TtlValue<T>> withTs(List<T> values) {
+ return values.stream().map(this::wrapWithTs).collect(Collectors.toList());
+ }
+
+ private class IteratorWithCleanup implements Iterator<T> {
+ private final Iterator<TtlValue<T>> originalIterator;
+ private boolean anyUnexpired = false;
+ private boolean uncleared = true;
+ private T nextUnexpired = null;
+
+ private IteratorWithCleanup(Iterator<TtlValue<T>> ttlIterator) {
+ this.originalIterator = ttlIterator;
+ }
+
+ @Override
+ public boolean hasNext() {
+ findNextUnexpired();
+ cleanupIfEmpty();
+ return nextUnexpired != null;
+ }
+
+ private void cleanupIfEmpty() {
+ boolean endOfIter = !originalIterator.hasNext() && nextUnexpired == null;
+ if (uncleared && !anyUnexpired && endOfIter) {
+ original.clear();
+ uncleared = false;
+ }
+ }
+
+ @Override
+ public T next() {
+ if (hasNext()) {
+ T result = nextUnexpired;
+ nextUnexpired = null;
+ return result;
+ }
+ throw new NoSuchElementException();
+ }
+
+ private void findNextUnexpired() {
+ while (nextUnexpired == null && originalIterator.hasNext()) {
+ TtlValue<T> ttlValue = originalIterator.next();
+ if (ttlValue == null) {
+ break;
+ }
+ boolean unexpired = !expired(ttlValue);
+ if (unexpired) {
+ anyUnexpired = true;
+ }
+ if (unexpired || returnExpired) {
+ nextUnexpired = ttlValue.getUserValue();
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2c13e00c/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
new file mode 100644
index 0000000..352c6c9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.AbstractMap;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps map state with TTL logic.
+ *
+ * @param <K> The type of key the state is associated to
+ * @param <N> The type of the namespace
+ * @param <UK> Type of the user entry key of state with TTL
+ * @param <UV> Type of the user entry value of state with TTL
+ */
+class TtlMapState<K, N, UK, UV>
+ extends AbstractTtlState<K, N, Map<UK, UV>, Map<UK, TtlValue<UV>>, InternalMapState<K, N, UK, TtlValue<UV>>>
+ implements InternalMapState<K, N, UK, UV> {
+ TtlMapState(
+ InternalMapState<K, N, UK, TtlValue<UV>> original,
+ TtlConfig config,
+ TtlTimeProvider timeProvider,
+ TypeSerializer<Map<UK, UV>> valueSerializer) {
+ super(original, config, timeProvider, valueSerializer);
+ }
+
+ @Override
+ public UV get(UK key) throws Exception {
+ return getWithTtlCheckAndUpdate(() -> original.get(key), v -> original.put(key, v), () -> original.remove(key));
+ }
+
+ @Override
+ public void put(UK key, UV value) throws Exception {
+ original.put(key, wrapWithTs(value));
+ }
+
+ @Override
+ public void putAll(Map<UK, UV> map) throws Exception {
+ if (map == null) {
+ return;
+ }
+ Map<UK, TtlValue<UV>> ttlMap = new HashMap<>(map.size());
+ for (UK key : map.keySet()) {
+ ttlMap.put(key, wrapWithTs(map.get(key)));
+ }
+ original.putAll(ttlMap);
+ }
+
+ @Override
+ public void remove(UK key) throws Exception {
+ original.remove(key);
+ }
+
+ @Override
+ public boolean contains(UK key) throws Exception {
+ return get(key) != null;
+ }
+
+ @Override
+ public Iterable<Map.Entry<UK, UV>> entries() throws Exception {
+ return entriesStream()::iterator;
+ }
+
+ private Stream<Map.Entry<UK, UV>> entriesStream() throws Exception {
+ Iterable<Map.Entry<UK, TtlValue<UV>>> withTs = original.entries();
+ withTs = withTs == null ? Collections.emptyList() : withTs;
+ return StreamSupport
+ .stream(withTs.spliterator(), false)
+ .filter(this::unexpiredAndUpdateOrCleanup)
+ .map(TtlMapState::unwrapWithoutTs);
+ }
+
+ private boolean unexpiredAndUpdateOrCleanup(Map.Entry<UK, TtlValue<UV>> e) {
+ UV unexpiredValue;
+ try {
+ unexpiredValue = getWithTtlCheckAndUpdate(
+ e::getValue,
+ v -> original.put(e.getKey(), v),
+ () -> original.remove(e.getKey()));
+ } catch (Exception ex) {
+ throw new FlinkRuntimeException(ex);
+ }
+ return unexpiredValue != null;
+ }
+
+ private static <UK, UV> Map.Entry<UK, UV> unwrapWithoutTs(Map.Entry<UK, TtlValue<UV>> e) {
+ return new AbstractMap.SimpleEntry<>(e.getKey(), e.getValue().getUserValue());
+ }
+
+ @Override
+ public Iterable<UK> keys() throws Exception {
+ return entriesStream().map(Map.Entry::getKey)::iterator;
+ }
+
+ @Override
+ public Iterable<UV> values() throws Exception {
+ return entriesStream().map(Map.Entry::getValue)::iterator;
+ }
+
+ @Override
+ public Iterator<Map.Entry<UK, UV>> iterator() throws Exception {
+ return entriesStream().iterator();
+ }
+
+ @Override
+ public void clear() {
+ original.clear();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2c13e00c/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReduceFunction.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReduceFunction.java
new file mode 100644
index 0000000..de593c5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReduceFunction.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+
+/**
+ * This class wraps reducing function with TTL logic.
+ *
+ * @param <T> Type of the user value of state with TTL
+ */
+class TtlReduceFunction<T>
+ extends AbstractTtlDecorator<ReduceFunction<T>>
+ implements ReduceFunction<TtlValue<T>> {
+
+ TtlReduceFunction(
+ ReduceFunction<T> originalReduceFunction,
+ TtlConfig config,
+ TtlTimeProvider timeProvider) {
+ super(originalReduceFunction, config, timeProvider);
+ }
+
+ @Override
+ public TtlValue<T> reduce(TtlValue<T> value1, TtlValue<T> value2) throws Exception {
+ T userValue1 = getUnexpired(value1);
+ T userValue2 = getUnexpired(value2);
+ if (userValue1 != null && userValue2 != null) {
+ return wrapWithTs(original.reduce(userValue1, userValue2));
+ } else if (userValue1 != null) {
+ return rewrapWithNewTs(value1);
+ } else if (userValue2 != null) {
+ return rewrapWithNewTs(value2);
+ } else {
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2c13e00c/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReducingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReducingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReducingState.java
new file mode 100644
index 0000000..9169d45
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReducingState.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalReducingState;
+
+import java.util.Collection;
+
+/**
+ * This class wraps reducing state with TTL logic.
+ *
+ * @param <K> The type of key the state is associated to
+ * @param <N> The type of the namespace
+ * @param <T> Type of the user value of state with TTL
+ */
+class TtlReducingState<K, N, T>
+ extends AbstractTtlState<K, N, T, TtlValue<T>, InternalReducingState<K, N, TtlValue<T>>>
+ implements InternalReducingState<K, N, T> {
+ TtlReducingState(
+ InternalReducingState<K, N, TtlValue<T>> originalState,
+ TtlConfig config,
+ TtlTimeProvider timeProvider,
+ TypeSerializer<T> valueSerializer) {
+ super(originalState, config, timeProvider, valueSerializer);
+ }
+
+ @Override
+ public T get() throws Exception {
+ return getInternal();
+ }
+
+ @Override
+ public void add(T value) throws Exception {
+ original.add(wrapWithTs(value, Long.MAX_VALUE));
+ }
+
+ @Override
+ public void clear() {
+ original.clear();
+ }
+
+ @Override
+ public void mergeNamespaces(N target, Collection<N> sources) throws Exception {
+ original.mergeNamespaces(target, sources);
+ }
+
+ @Override
+ public T getInternal() throws Exception {
+ return getWithTtlCheckAndUpdate(original::getInternal, original::updateInternal);
+ }
+
+ @Override
+ public void updateInternal(T valueToStore) throws Exception {
+ original.updateInternal(wrapWithTs(valueToStore));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2c13e00c/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlTimeProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlTimeProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlTimeProvider.java
new file mode 100644
index 0000000..bac9d36
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlTimeProvider.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+/**
+ * Provides time to TTL logic to judge about state expiration.
+ */
+interface TtlTimeProvider {
+ long currentTimestamp();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2c13e00c/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java
new file mode 100644
index 0000000..99f8b0b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * This class wraps user value of state with TTL.
+ *
+ * @param <T> Type of the user value of state with TTL
+ */
+class TtlValue<T> implements Serializable {
+ private final T userValue;
+ private final long lastAccessTimestamp;
+
+ TtlValue(T userValue, long lastAccessTimestamp) {
+ Preconditions.checkNotNull(userValue);
+ this.userValue = userValue;
+ this.lastAccessTimestamp = lastAccessTimestamp;
+ }
+
+ T getUserValue() {
+ return userValue;
+ }
+
+ long getLastAccessTimestamp() {
+ return lastAccessTimestamp;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2c13e00c/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValueState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValueState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValueState.java
new file mode 100644
index 0000000..9feb62d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValueState.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalValueState;
+
+import java.io.IOException;
+
+/**
+ * This class wraps value state with TTL logic.
+ *
+ * @param <K> The type of key the state is associated to
+ * @param <N> The type of the namespace
+ * @param <T> Type of the user value of state with TTL
+ */
+class TtlValueState<K, N, T>
+ extends AbstractTtlState<K, N, T, TtlValue<T>, InternalValueState<K, N, TtlValue<T>>>
+ implements InternalValueState<K, N, T> {
+ TtlValueState(
+ InternalValueState<K, N, TtlValue<T>> originalState,
+ TtlConfig config,
+ TtlTimeProvider timeProvider,
+ TypeSerializer<T> valueSerializer) {
+ super(originalState, config, timeProvider, valueSerializer);
+ }
+
+ @Override
+ public T value() throws IOException {
+ return getWithTtlCheckAndUpdate(original::value, original::update);
+ }
+
+ @Override
+ public void update(T value) throws IOException {
+ original.update(wrapWithTs(value));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2c13e00c/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockInternalKvState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockInternalKvState.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockInternalKvState.java
new file mode 100644
index 0000000..878d888
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockInternalKvState.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Supplier;
+
+class MockInternalKvState<K, N, T> implements InternalKvState<K, N, T> {
+ private Map<N, T> namespacedValues = new HashMap<>();
+ private T defaultNamespaceValue;
+ private N currentNamespace;
+ private final Supplier<T> emptyValue;
+
+ MockInternalKvState() {
+ this(() -> null);
+ }
+
+ MockInternalKvState(Supplier<T> emptyValue) {
+ this.emptyValue = emptyValue;
+ defaultNamespaceValue = emptyValue.get();
+ }
+
+ @Override
+ public TypeSerializer<K> getKeySerializer() {
+ return null;
+ }
+
+ @Override
+ public TypeSerializer<N> getNamespaceSerializer() {
+ return null;
+ }
+
+ @Override
+ public TypeSerializer<T> getValueSerializer() {
+ return null;
+ }
+
+ @Override
+ public void setCurrentNamespace(N namespace) {
+ currentNamespace = namespace;
+ }
+
+ @Override
+ public byte[] getSerializedValue(
+ byte[] serializedKeyAndNamespace,
+ TypeSerializer safeKeySerializer,
+ TypeSerializer safeNamespaceSerializer,
+ TypeSerializer safeValueSerializer) {
+ return null;
+ }
+
+ @Override
+ public void clear() {
+ if (currentNamespace == null) {
+ defaultNamespaceValue = emptyValue.get();
+ } else {
+ namespacedValues.remove(currentNamespace);
+ }
+ }
+
+ public T getInternal() {
+ T value = currentNamespace == null ? defaultNamespaceValue :
+ namespacedValues.getOrDefault(currentNamespace, emptyValue.get());
+ updateInternal(value);
+ return value;
+ }
+
+ @SuppressWarnings("WeakerAccess")
+ public void updateInternal(T valueToStore) {
+ if (currentNamespace == null) {
+ defaultNamespaceValue = valueToStore;
+ } else {
+ namespacedValues.put(currentNamespace, valueToStore);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2c13e00c/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockInternalMapState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockInternalMapState.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockInternalMapState.java
new file mode 100644
index 0000000..c548017
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockInternalMapState.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.runtime.state.internal.InternalMapState;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+class MockInternalMapState<K, N, UK, UV>
+ extends MockInternalKvState<K, N, Map<UK, UV>>
+ implements InternalMapState<K, N, UK, UV> {
+
+ MockInternalMapState() {
+ super(HashMap::new);
+ }
+
+ @Override
+ public void clear() {
+ getInternal().clear();
+ }
+
+ @Override
+ public UV get(UK key) {
+ return getInternal().get(key);
+ }
+
+ @Override
+ public void put(UK key, UV value) {
+ this.getInternal().put(key, value);
+ }
+
+ @Override
+ public void putAll(Map<UK, UV> map) {
+ getInternal().putAll(map);
+ }
+
+ @Override
+ public void remove(UK key) {
+ getInternal().remove(key);
+ }
+
+ @Override
+ public boolean contains(UK key) {
+ return getInternal().containsKey(key);
+ }
+
+ @Override
+ public Iterable<Map.Entry<UK, UV>> entries() {
+ return copy().entrySet();
+ }
+
+ private Map<UK, UV> copy() {
+ return new HashMap<>(getInternal());
+ }
+
+ @Override
+ public Iterable<UK> keys() {
+ return copy().keySet();
+ }
+
+ @Override
+ public Iterable<UV> values() {
+ return copy().values();
+ }
+
+ @Override
+ public Iterator<Map.Entry<UK, UV>> iterator() {
+ return entries().iterator();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2c13e00c/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockInternalMergingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockInternalMergingState.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockInternalMergingState.java
new file mode 100644
index 0000000..582faf3
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockInternalMergingState.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.runtime.state.internal.InternalMergingState;
+
+import java.util.Collection;
+import java.util.function.Supplier;
+
+abstract class MockInternalMergingState<K, N, IN, ACC, OUT>
+ extends MockInternalKvState<K, N, ACC> implements InternalMergingState<K, N, IN, ACC, OUT> {
+
+ MockInternalMergingState() {
+ super();
+ }
+
+ MockInternalMergingState(Supplier<ACC> emptyValue) {
+ super(emptyValue);
+ }
+
+ @Override
+ public void mergeNamespaces(N target, Collection<N> sources) throws Exception {
+ ACC acc = null;
+ for (N n : sources) {
+ setCurrentNamespace(n);
+ ACC nAcc = getInternal();
+ acc = nAcc == null ? acc : (acc == null ? nAcc : mergeState(acc, nAcc));
+ }
+ if (acc != null) {
+ setCurrentNamespace(target);
+ updateInternal(acc);
+ }
+ }
+
+ abstract ACC mergeState(ACC acc, ACC nAcc) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2c13e00c/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTimeProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTimeProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTimeProvider.java
new file mode 100644
index 0000000..e14c3f8
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTimeProvider.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+class MockTimeProvider implements TtlTimeProvider {
+ long time = 0;
+
+ @Override
+ public long currentTimestamp() {
+ return time;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2c13e00c/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlAggregatingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlAggregatingStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlAggregatingStateTest.java
new file mode 100644
index 0000000..477f057
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlAggregatingStateTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.internal.InternalAggregatingState;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/** Test suite for {@link TtlAggregatingState}. */
+public class TtlAggregatingStateTest
+ extends TtlMergingStateBase.TtlIntegerMergingStateBase<TtlAggregatingState<?, String, Integer, Long, String>, Integer, String> {
+ private static final long DEFAULT_ACCUMULATOR = 3L;
+
+ @Override
+ TtlAggregatingState<?, String, Integer, Long, String> createState() {
+ TtlAggregateFunction<Integer, Long, String> ttlAggregateFunction =
+ new TtlAggregateFunction<>(AGGREGATE, ttlConfig, timeProvider);
+ return new TtlAggregatingState<>(
+ new MockInternalTtlAggregatingState<>(ttlAggregateFunction),
+ ttlConfig, timeProvider, null, ttlAggregateFunction);
+ }
+
+ @Override
+ void initTestValues() {
+ updater = v -> ttlState.add(v);
+ getter = () -> ttlState.get();
+ originalGetter = () -> ttlState.original.get();
+
+ updateEmpty = 5;
+ updateUnexpired = 7;
+ updateExpired = 6;
+
+ getUpdateEmpty = "8";
+ getUnexpired = "15";
+ getUpdateExpired = "9";
+ }
+
+ @Override
+ String getMergeResult(
+ List<Tuple2<String, Integer>> unexpiredUpdatesToMerge,
+ List<Tuple2<String, Integer>> finalUpdatesToMerge) {
+ Set<String> namespaces = new HashSet<>();
+ unexpiredUpdatesToMerge.forEach(t -> namespaces.add(t.f0));
+ finalUpdatesToMerge.forEach(t -> namespaces.add(t.f0));
+ return Integer.toString(getIntegerMergeResult(unexpiredUpdatesToMerge, finalUpdatesToMerge) +
+ namespaces.size() * (int) DEFAULT_ACCUMULATOR);
+ }
+
+ private static class MockInternalTtlAggregatingState<K, N, IN, ACC, OUT>
+ extends MockInternalMergingState<K, N, IN, ACC, OUT> implements InternalAggregatingState<K, N, IN, ACC, OUT> {
+ private final AggregateFunction<IN, ACC, OUT> aggregateFunction;
+
+ private MockInternalTtlAggregatingState(AggregateFunction<IN, ACC, OUT> aggregateFunction) {
+ this.aggregateFunction = aggregateFunction;
+ }
+
+ @Override
+ public OUT get() {
+ return aggregateFunction.getResult(getInternal());
+ }
+
+ @Override
+ public void add(IN value) {
+ updateInternal(aggregateFunction.add(value, getInternal()));
+ }
+
+ @Override
+ ACC mergeState(ACC acc, ACC nAcc) {
+ return aggregateFunction.merge(acc, nAcc);
+ }
+ }
+
+ private static final AggregateFunction<Integer, Long, String> AGGREGATE =
+ new AggregateFunction<Integer, Long, String>() {
+ @Override
+ public Long createAccumulator() {
+ return DEFAULT_ACCUMULATOR;
+ }
+
+ @Override
+ public Long add(Integer value, Long accumulator) {
+ return accumulator + value;
+ }
+
+ @Override
+ public String getResult(Long accumulator) {
+ return accumulator.toString();
+ }
+
+ @Override
+ public Long merge(Long a, Long b) {
+ return a + b;
+ }
+ };
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2c13e00c/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlFoldingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlFoldingStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlFoldingStateTest.java
new file mode 100644
index 0000000..01d5ee1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlFoldingStateTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.runtime.state.internal.InternalFoldingState;
+
+/** Test suite for {@link TtlFoldingState}. */
+public class TtlFoldingStateTest extends TtlStateTestBase<TtlFoldingState<?, String, Long, String>, Long, String> {
+ @Override
+ TtlFoldingState<?, String, Long, String> createState() {
+ FoldFunction<Long, TtlValue<String>> ttlFoldFunction = new TtlFoldFunction<>(FOLD, ttlConfig, timeProvider, "1");
+ return new TtlFoldingState<>(
+ new MockInternalFoldingState<>(ttlFoldFunction), ttlConfig, timeProvider, null);
+ }
+
+ @Override
+ void initTestValues() {
+ updater = v -> ttlState.add(v);
+ getter = () -> ttlState.get();
+ originalGetter = () -> ttlState.original.get();
+
+ updateEmpty = 5L;
+ updateUnexpired = 7L;
+ updateExpired = 6L;
+
+ getUpdateEmpty = "6";
+ getUnexpired = "13";
+ getUpdateExpired = "7";
+ }
+
+ private static class MockInternalFoldingState<K, N, T, ACC>
+ extends MockInternalKvState<K, N, ACC> implements InternalFoldingState<K, N, T, ACC> {
+ private final FoldFunction<T, ACC> foldFunction;
+
+ private MockInternalFoldingState(FoldFunction<T, ACC> foldFunction) {
+ this.foldFunction = foldFunction;
+ }
+
+ @Override
+ public ACC get() {
+ return getInternal();
+ }
+
+ @Override
+ public void add(T value) throws Exception {
+ updateInternal(foldFunction.fold(get(), value));
+ }
+ }
+
+ private static final FoldFunction<Long, String> FOLD = (acc, val) -> {
+ long lacc = acc == null ? 0 : Long.parseLong(acc);
+ return Long.toString(val == null ? lacc : lacc + val);
+ };
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2c13e00c/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlListStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlListStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlListStateTest.java
new file mode 100644
index 0000000..5128ff2
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlListStateTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.internal.InternalListState;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.StreamSupport;
+
+/** Test suite for {@link TtlListState}. */
+public class TtlListStateTest
+ extends TtlMergingStateBase<TtlListState<?, String, Integer>, List<Integer>, Iterable<Integer>> {
+ @Override
+ TtlListState<?, String, Integer> createState() {
+ return new TtlListState<>(new MockInternalListState<>(), ttlConfig, timeProvider, null);
+ }
+
+ @Override
+ void initTestValues() {
+ updater = v -> ttlState.addAll(v);
+ getter = () -> StreamSupport.stream(ttlState.get().spliterator(), false).collect(Collectors.toList());
+ originalGetter = () -> ttlState.original.get();
+
+ emptyValue = Collections.emptyList();
+
+ updateEmpty = Arrays.asList(5, 7, 10);
+ updateUnexpired = Arrays.asList(8, 9, 11);
+ updateExpired = Arrays.asList(1, 4);
+
+ getUpdateEmpty = updateEmpty;
+ getUnexpired = updateUnexpired;
+ getUpdateExpired = updateExpired;
+ }
+
+ @Override
+ List<Integer> generateRandomUpdate() {
+ int size = RANDOM.nextInt(5);
+ return IntStream.range(0, size).mapToObj(i -> RANDOM.nextInt(100)).collect(Collectors.toList());
+ }
+
+ @Override
+ Iterable<Integer> getMergeResult(
+ List<Tuple2<String, List<Integer>>> unexpiredUpdatesToMerge,
+ List<Tuple2<String, List<Integer>>> finalUpdatesToMerge) {
+ List<Integer> result = new ArrayList<>();
+ finalUpdatesToMerge.forEach(t -> result.addAll(t.f1));
+ return result;
+ }
+
+ private static class MockInternalListState<K, N, T>
+ extends MockInternalMergingState<K, N, T, List<T>, Iterable<T>>
+ implements InternalListState<K, N, T> {
+
+ MockInternalListState() {
+ super(ArrayList::new);
+ }
+
+ @Override
+ public void update(List<T> elements) {
+ updateInternal(elements);
+ }
+
+ @Override
+ public void addAll(List<T> elements) {
+ getInternal().addAll(elements);
+ }
+
+ @Override
+ List<T> mergeState(List<T> acc, List<T> nAcc) {
+ acc = new ArrayList<>(acc);
+ acc.addAll(nAcc);
+ return acc;
+ }
+
+ @Override
+ public Iterable<T> get() {
+ return getInternal();
+ }
+
+ @Override
+ public void add(T element) {
+ getInternal().add(element);
+ }
+
+ @Override
+ public void clear() {
+ getInternal().clear();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2c13e00c/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTest.java
new file mode 100644
index 0000000..ac9b038
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+/** Test suite for per element methods of {@link TtlMapState}. */
+public class TtlMapStatePerElementTest extends TtlStateTestBase<TtlMapState<?, String, Integer, String>, String, String> {
+ private static final int TEST_KEY = 1;
+ private static final String TEST_VAL1 = "test value1";
+ private static final String TEST_VAL2 = "test value2";
+ private static final String TEST_VAL3 = "test value3";
+
+ @Override
+ TtlMapState<?, String, Integer, String> createState() {
+ return new TtlMapState<>(new MockInternalMapState<>(), ttlConfig, timeProvider, null);
+ }
+
+ @Override
+ void initTestValues() {
+ updater = v -> ttlState.put(TEST_KEY, v);
+ getter = () -> ttlState.get(TEST_KEY);
+ originalGetter = () -> ttlState.original.get(TEST_KEY);
+
+ updateEmpty = TEST_VAL1;
+ updateUnexpired = TEST_VAL2;
+ updateExpired = TEST_VAL3;
+
+ getUpdateEmpty = TEST_VAL1;
+ getUnexpired = TEST_VAL2;
+ getUpdateExpired = TEST_VAL3;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2c13e00c/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTest.java
new file mode 100644
index 0000000..535962b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/** Test suite for collection methods of {@link TtlMapState}. */
+public class TtlMapStateTest extends
+ TtlStateTestBase<TtlMapState<?, String, Integer, String>, Map<Integer, String>, Set<Map.Entry<Integer, String>>> {
+
+ @Override
+ TtlMapState<?, String, Integer, String> createState() {
+ return new TtlMapState<>(new MockInternalMapState<>(), ttlConfig, timeProvider, null);
+ }
+
+ @Override
+ void initTestValues() {
+ updater = map -> ttlState.putAll(map);
+ getter = () -> StreamSupport.stream(ttlState.entries().spliterator(), false).collect(Collectors.toSet());
+ originalGetter = () -> ttlState.original.entries();
+
+ emptyValue = Collections.emptySet();
+
+ updateEmpty = mapOf(Tuple2.of(3, "3"), Tuple2.of(5, "5"), Tuple2.of(10, "10"));
+ updateUnexpired = mapOf(Tuple2.of(12, "12"), Tuple2.of(7, "7"));
+ updateExpired = mapOf(Tuple2.of(15, "15"), Tuple2.of(4, "4"));
+
+ getUpdateEmpty = updateEmpty.entrySet();
+ getUnexpired = updateUnexpired.entrySet();
+ getUpdateExpired = updateExpired.entrySet();
+ }
+
+ @SafeVarargs
+ private static <UK, UV> Map<UK, UV> mapOf(Tuple2<UK, UV> ... entries) {
+ return Arrays.stream(entries).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2c13e00c/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMergingStateBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMergingStateBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMergingStateBase.java
new file mode 100644
index 0000000..6a7aebe
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMergingStateBase.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.internal.InternalMergingState;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+
+abstract class TtlMergingStateBase<S extends InternalMergingState<?, String, ?, ?, GV>, UV, GV>
+ extends TtlStateTestBase<S, UV, GV> {
+ static final Random RANDOM = new Random();
+
+ private static final List<String> NAMESPACES = Arrays.asList(
+ "unsetNamespace1",
+ "unsetNamespace2",
+ "expiredNamespace",
+ "expiredAndUpdatedNamespace",
+ "unexpiredNamespace",
+ "finalNamespace");
+
+ @Test
+ public void testMergeNamespaces() throws Exception {
+ initTest();
+
+ timeProvider.time = 0;
+ List<Tuple2<String, UV>> expiredUpdatesToMerge = generateExpiredUpdatesToMerge();
+ applyStateUpdates(expiredUpdatesToMerge);
+
+ timeProvider.time = 120;
+ List<Tuple2<String, UV>> unexpiredUpdatesToMerge = generateUnexpiredUpdatesToMerge();
+ applyStateUpdates(unexpiredUpdatesToMerge);
+
+ timeProvider.time = 150;
+ List<Tuple2<String, UV>> finalUpdatesToMerge = generateFinalUpdatesToMerge();
+ applyStateUpdates(finalUpdatesToMerge);
+
+ timeProvider.time = 230;
+ ttlState.mergeNamespaces("targetNamespace", NAMESPACES);
+ ttlState.setCurrentNamespace("targetNamespace");
+ assertEquals("Unexpected result of merge operation",
+ getMergeResult(unexpiredUpdatesToMerge, finalUpdatesToMerge), getter.get());
+ }
+
+ private List<Tuple2<String, UV>> generateExpiredUpdatesToMerge() {
+ return Arrays.asList(
+ Tuple2.of("expiredNamespace", generateRandomUpdate()),
+ Tuple2.of("expiredNamespace", generateRandomUpdate()),
+ Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate()),
+ Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate())
+ );
+ }
+
+ private List<Tuple2<String, UV>> generateUnexpiredUpdatesToMerge() {
+ return Arrays.asList(
+ Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate()),
+ Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate()),
+ Tuple2.of("unexpiredNamespace", generateRandomUpdate()),
+ Tuple2.of("unexpiredNamespace", generateRandomUpdate())
+ );
+ }
+
+ private List<Tuple2<String, UV>> generateFinalUpdatesToMerge() {
+ return Arrays.asList(
+ Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate()),
+ Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate()),
+ Tuple2.of("unexpiredNamespace", generateRandomUpdate()),
+ Tuple2.of("unexpiredNamespace", generateRandomUpdate()),
+ Tuple2.of("finalNamespace", generateRandomUpdate()),
+ Tuple2.of("finalNamespace", generateRandomUpdate())
+ );
+ }
+
+ abstract UV generateRandomUpdate();
+
+ private void applyStateUpdates(List<Tuple2<String, UV>> updates) throws Exception {
+ for (Tuple2<String, UV> t : updates) {
+ ttlState.setCurrentNamespace(t.f0);
+ updater.accept(t.f1);
+ }
+ }
+
+ abstract GV getMergeResult(
+ List<Tuple2<String, UV>> unexpiredUpdatesToMerge,
+ List<Tuple2<String, UV>> finalUpdatesToMerge);
+
+ @SuppressWarnings("unchecked")
+ abstract static class TtlIntegerMergingStateBase<
+ S extends InternalMergingState<?, String, ?, ?, GV>,
+ UV extends Number, GV>
+ extends TtlMergingStateBase<S, UV, GV> {
+ @Override
+ UV generateRandomUpdate() {
+ return (UV) (Integer) RANDOM.nextInt(1000);
+ }
+
+ int getIntegerMergeResult(
+ List<Tuple2<String, UV>> unexpiredUpdatesToMerge,
+ List<Tuple2<String, UV>> finalUpdatesToMerge) {
+ return unexpiredUpdatesToMerge.stream().mapToInt(t -> (Integer) t.f1).sum() +
+ finalUpdatesToMerge.stream().mapToInt(t -> (Integer) t.f1).sum();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2c13e00c/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlReducingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlReducingStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlReducingStateTest.java
new file mode 100644
index 0000000..44043a1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlReducingStateTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.internal.InternalReducingState;
+
+import java.util.List;
+
+/** Test suite for {@link TtlReducingState}. */
+public class TtlReducingStateTest
+ extends TtlMergingStateBase.TtlIntegerMergingStateBase<TtlReducingState<?, String, Integer>, Integer, Integer> {
+ @Override
+ TtlReducingState<?, String, Integer> createState() {
+ ReduceFunction<TtlValue<Integer>> ttlReduceFunction = new TtlReduceFunction<>(REDUCE, ttlConfig, timeProvider);
+ return new TtlReducingState<>(
+ new MockInternalReducingState<>(ttlReduceFunction), ttlConfig, timeProvider, null);
+ }
+
+ @Override
+ void initTestValues() {
+ updater = v -> ttlState.add(v);
+ getter = () -> ttlState.get();
+ originalGetter = () -> ttlState.original.get();
+
+ updateEmpty = 5;
+ updateUnexpired = 7;
+ updateExpired = 6;
+
+ getUpdateEmpty = 5;
+ getUnexpired = 12;
+ getUpdateExpired = 6;
+ }
+
+ @Override
+ Integer getMergeResult(
+ List<Tuple2<String, Integer>> unexpiredUpdatesToMerge,
+ List<Tuple2<String, Integer>> finalUpdatesToMerge) {
+ return getIntegerMergeResult(unexpiredUpdatesToMerge, finalUpdatesToMerge);
+ }
+
+ private static class MockInternalReducingState<K, N, T>
+ extends MockInternalMergingState<K, N, T, T, T> implements InternalReducingState<K, N, T> {
+ private final ReduceFunction<T> reduceFunction;
+
+ private MockInternalReducingState(ReduceFunction<T> reduceFunction) {
+ this.reduceFunction = reduceFunction;
+ }
+
+ @Override
+ public T get() {
+ return getInternal();
+ }
+
+ @Override
+ public void add(T value) throws Exception {
+ updateInternal(reduceFunction.reduce(get(), value));
+ }
+
+ @Override
+ T mergeState(T t, T nAcc) throws Exception {
+ return reduceFunction.reduce(t, nAcc);
+ }
+ }
+
+ private static final ReduceFunction<Integer> REDUCE = (v1, v2) -> {
+ if (v1 == null && v2 == null) {
+ return null;
+ } else if (v1 == null) {
+ return v2;
+ } else if (v2 == null) {
+ return v1;
+ } else {
+ return v1 + v2;
+ }
+ };
+}