You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/07/20 08:26:58 UTC
flink git commit: [FLINK-9858][tests] State TTL End-to-End Test
Repository: flink
Updated Branches:
refs/heads/master 46334e2f3 -> 01cf808ee
[FLINK-9858][tests] State TTL End-to-End Test
This closes #6361.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/01cf808e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/01cf808e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/01cf808e
Branch: refs/heads/master
Commit: 01cf808ee863f4b0f429f20ba05fd2e322842e48
Parents: 46334e2
Author: Andrey Zagrebin <az...@gmail.com>
Authored: Fri Jul 13 19:27:35 2018 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Fri Jul 20 10:26:28 2018 +0200
----------------------------------------------------------------------
.../flink-stream-state-ttl-test/pom.xml | 104 +++++++++++
.../tests/DataStreamStateTTLTestProgram.java | 100 ++++++++++
.../flink/streaming/tests/TtlStateUpdate.java | 45 +++++
.../streaming/tests/TtlStateUpdateSource.java | 78 ++++++++
.../tests/TtlVerifyUpdateFunction.java | 187 +++++++++++++++++++
.../tests/verify/AbstractTtlStateVerifier.java | 105 +++++++++++
.../verify/TtlAggregatingStateVerifier.java | 107 +++++++++++
.../tests/verify/TtlFoldingStateVerifier.java | 87 +++++++++
.../tests/verify/TtlListStateVerifier.java | 78 ++++++++
.../tests/verify/TtlMapStateVerifier.java | 94 ++++++++++
.../tests/verify/TtlReducingStateVerifier.java | 86 +++++++++
.../tests/verify/TtlStateVerifier.java | 60 ++++++
.../tests/verify/TtlUpdateContext.java | 71 +++++++
.../tests/verify/TtlValueStateVerifier.java | 66 +++++++
.../tests/verify/TtlVerificationContext.java | 69 +++++++
.../streaming/tests/verify/ValueWithTs.java | 107 +++++++++++
flink-end-to-end-tests/pom.xml | 1 +
flink-end-to-end-tests/run-nightly-tests.sh | 3 +
flink-end-to-end-tests/test-scripts/common.sh | 37 ++--
.../test-scripts/test_stream_state_ttl.sh | 83 ++++++++
20 files changed, 1555 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/01cf808e/flink-end-to-end-tests/flink-stream-state-ttl-test/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/pom.xml b/flink-end-to-end-tests/flink-stream-state-ttl-test/pom.xml
new file mode 100644
index 0000000..ad04f3e
--- /dev/null
+++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/pom.xml
@@ -0,0 +1,104 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-end-to-end-tests</artifactId>
+ <version>1.7-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-stream-state-ttl-test</artifactId>
+ <name>flink-stream-state-ttl-test</name>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-datastream-allround-test</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>3.0.0</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <finalName>DataStreamStateTTLTestProgram</finalName>
+ <artifactSet>
+ <excludes>
+ <exclude>com.google.code.findbugs:jsr305</exclude>
+ <exclude>org.slf4j:*</exclude>
+ <exclude>log4j:*</exclude>
+ </excludes>
+ </artifactSet>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass>org.apache.flink.streaming.tests.DataStreamStateTTLTestProgram</mainClass>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/01cf808e/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/DataStreamStateTTLTestProgram.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/DataStreamStateTTLTestProgram.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/DataStreamStateTTLTestProgram.java
new file mode 100644
index 0000000..f4c9619
--- /dev/null
+++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/DataStreamStateTTLTestProgram.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.state.StateTtlConfiguration;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+
+import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.setupEnvironment;
+
+/**
+ * A test job for State TTL feature.
+ *
+ * <p>The test pipeline does the following:
+ * - generates random keyed state updates for each state TTL verifier (state type)
+ * - performs update of created state with TTL for each verifier
+ * - keeps previous updates in other state
+ * - verifies expected result of last update against preserved history of updates
+ *
+ * <p>Program parameters:
+ * <ul>
+ * <li>update_generator_source.keyspace (int, default - 100): Number of different keys for updates emitted by the update generator.</li>
+ * <li>update_generator_source.sleep_time (long, default - 0): Milliseconds to sleep after emitting updates in the update generator. Set to 0 to disable sleeping.</li>
+ * <li>update_generator_source.sleep_after_elements (long, default - 0): Number of updates to emit before sleeping in the update generator. Set to 0 to disable sleeping.</li>
+ * <li>state_ttl_verifier.ttl_milli (long, default - 1000): State time-to-live.</li>
+ * <li>report_stat.after_updates_num (long, default - 200): Report state update statistics after certain number of updates (average update chain length and clashes).</li>
+ * </ul>
+ */
+public class DataStreamStateTTLTestProgram {
+ private static final ConfigOption<Integer> UPDATE_GENERATOR_SRC_KEYSPACE = ConfigOptions
+ .key("update_generator_source.keyspace")
+ .defaultValue(100);
+
+ private static final ConfigOption<Long> UPDATE_GENERATOR_SRC_SLEEP_TIME = ConfigOptions
+ .key("update_generator_source.sleep_time")
+ .defaultValue(0L);
+
+ private static final ConfigOption<Long> UPDATE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS = ConfigOptions
+ .key("update_generator_source.sleep_after_elements")
+ .defaultValue(0L);
+
+ private static final ConfigOption<Long> STATE_TTL_VERIFIER_TTL_MILLI = ConfigOptions
+ .key("state_ttl_verifier.ttl_milli")
+ .defaultValue(1000L);
+
+ private static final ConfigOption<Long> REPORT_STAT_AFTER_UPDATES_NUM = ConfigOptions
+ .key("report_stat.after_updates_num")
+ .defaultValue(200L);
+
+ public static void main(String[] args) throws Exception {
+ final ParameterTool pt = ParameterTool.fromArgs(args);
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ setupEnvironment(env, pt);
+
+ int keySpace = pt.getInt(UPDATE_GENERATOR_SRC_KEYSPACE.key(), UPDATE_GENERATOR_SRC_KEYSPACE.defaultValue());
+ long sleepAfterElements = pt.getLong(UPDATE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS.key(),
+ UPDATE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS.defaultValue());
+ long sleepTime = pt.getLong(UPDATE_GENERATOR_SRC_SLEEP_TIME.key(),
+ UPDATE_GENERATOR_SRC_SLEEP_TIME.defaultValue());
+ Time ttl = Time.milliseconds(pt.getLong(STATE_TTL_VERIFIER_TTL_MILLI.key(),
+ STATE_TTL_VERIFIER_TTL_MILLI.defaultValue()));
+ long reportStatAfterUpdatesNum = pt.getLong(REPORT_STAT_AFTER_UPDATES_NUM.key(),
+ REPORT_STAT_AFTER_UPDATES_NUM.defaultValue());
+
+ StateTtlConfiguration ttlConfig = StateTtlConfiguration.newBuilder(ttl).build();
+
+ env
+ .addSource(new TtlStateUpdateSource(keySpace, sleepAfterElements, sleepTime))
+ .name("TtlStateUpdateSource")
+ .keyBy(TtlStateUpdate::getKey)
+ .flatMap(new TtlVerifyUpdateFunction(ttlConfig, reportStatAfterUpdatesNum))
+ .name("TtlVerifyUpdateFunction")
+ .addSink(new PrintSinkFunction<>())
+ .name("PrintFailedVerifications");
+
+ env.execute("State TTL test job");
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/01cf808e/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlStateUpdate.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlStateUpdate.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlStateUpdate.java
new file mode 100644
index 0000000..e89b544
--- /dev/null
+++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlStateUpdate.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/** Randomly generated keyed state updates per state type. */
+class TtlStateUpdate implements Serializable {
+ private final int key;
+
+ @Nonnull
+ private final Map<String, Object> updates;
+
+ TtlStateUpdate(int key, @Nonnull Map<String, Object> updates) {
+ this.key = key;
+ this.updates = updates;
+ }
+
+ int getKey() {
+ return key;
+ }
+
+ Object getUpdate(String verifierId) {
+ return updates.get(verifierId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/01cf808e/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlStateUpdateSource.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlStateUpdateSource.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlStateUpdateSource.java
new file mode 100644
index 0000000..6aff14e
--- /dev/null
+++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlStateUpdateSource.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.tests.verify.TtlStateVerifier;
+
+import java.util.Map;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+/**
+ * Source of randomly generated keyed state updates.
+ *
+ * <p>Internal loop generates {@code sleepAfterElements} state updates
+ * for each verifier from {@link TtlStateVerifier#VERIFIERS} using {@link TtlStateVerifier#generateRandomUpdate}
+ * and waits for {@code sleepTime} to continue generation.
+ */
+class TtlStateUpdateSource extends RichParallelSourceFunction<TtlStateUpdate> {
+ private final int maxKey;
+ private final long sleepAfterElements;
+ private final long sleepTime;
+
+ /** Flag that determines if this source is running, i.e. generating events. */
+ private volatile boolean running = true;
+
+ TtlStateUpdateSource(int maxKey, long sleepAfterElements, long sleepTime) {
+ this.maxKey = maxKey;
+ this.sleepAfterElements = sleepAfterElements;
+ this.sleepTime = sleepTime;
+ }
+
+ @Override
+ public void run(SourceContext<TtlStateUpdate> ctx) throws Exception {
+ Random random = new Random();
+ long elementsBeforeSleep = sleepAfterElements;
+ while (running) {
+ for (int i = 0; i < sleepAfterElements; i++) {
+ synchronized (ctx.getCheckpointLock()) {
+ Map<String, Object> updates = TtlStateVerifier.VERIFIERS.stream()
+ .collect(Collectors.toMap(TtlStateVerifier::getId, TtlStateVerifier::generateRandomUpdate));
+ ctx.collect(new TtlStateUpdate(random.nextInt(maxKey), updates));
+ }
+ }
+
+ if (sleepTime > 0) {
+ if (elementsBeforeSleep == 1) {
+ elementsBeforeSleep = sleepAfterElements;
+ long rnd = sleepTime < Integer.MAX_VALUE ? random.nextInt((int) sleepTime) : 0L;
+ Thread.sleep(rnd + sleepTime);
+ } else if (elementsBeforeSleep > 1) {
+ --elementsBeforeSleep;
+ }
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/01cf808e/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java
new file mode 100644
index 0000000..a99a45f
--- /dev/null
+++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateTtlConfiguration;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.tests.verify.TtlStateVerifier;
+import org.apache.flink.streaming.tests.verify.TtlUpdateContext;
+import org.apache.flink.streaming.tests.verify.TtlVerificationContext;
+import org.apache.flink.streaming.tests.verify.ValueWithTs;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * Update state with TTL for each verifier.
+ *
+ * <p>This function for each verifier from {@link TtlStateVerifier#VERIFIERS}
+ * - creates state with TTL
+ * - creates state of previous updates for further verification against it
+ * - receives random state update
+ * - gets state value before update
+ * - updates state with random value
+ * - gets state value after update
+ * - checks if this update clashes with any previous updates
+ * - if clashes, clears state and recreate update
+ * - verifies last update against previous updates
+ * - emits verification context in case of failure
+ */
+class TtlVerifyUpdateFunction
+ extends RichFlatMapFunction<TtlStateUpdate, String> implements CheckpointedFunction {
+ private static final Logger LOG = LoggerFactory.getLogger(TtlVerifyUpdateFunction.class);
+
+ @Nonnull
+ private final StateTtlConfiguration ttlConfig;
+ private final long ttl;
+ private final UpdateStat stat;
+
+ private transient Map<String, State> states;
+ private transient Map<String, ListState<ValueWithTs<?>>> prevUpdatesByVerifierId;
+
+ TtlVerifyUpdateFunction(@Nonnull StateTtlConfiguration ttlConfig, long reportStatAfterUpdatesNum) {
+ this.ttlConfig = ttlConfig;
+ this.ttl = ttlConfig.getTtl().toMilliseconds();
+ this.stat = new UpdateStat(reportStatAfterUpdatesNum);
+ }
+
+ @Override
+ public void flatMap(TtlStateUpdate updates, Collector<String> out) throws Exception {
+ for (TtlStateVerifier<?, ?> verifier : TtlStateVerifier.VERIFIERS) {
+ TtlVerificationContext<?, ?> verificationContext = generateUpdateAndVerificationContext(updates, verifier);
+ if (!verifier.verify(verificationContext)) {
+ out.collect(verificationContext.toString());
+ }
+ }
+ }
+
+ private TtlVerificationContext<?, ?> generateUpdateAndVerificationContext(
+ TtlStateUpdate updates, TtlStateVerifier<?, ?> verifier) throws Exception {
+ List<ValueWithTs<?>> prevUpdates = getPrevUpdates(verifier.getId());
+ Object update = updates.getUpdate(verifier.getId());
+ TtlUpdateContext<?, ?> updateContext = performUpdate(verifier, update);
+ boolean clashes = updateClashesWithPrevUpdates(updateContext.getUpdateWithTs(), prevUpdates);
+ if (clashes) {
+ resetState(verifier.getId());
+ prevUpdates = Collections.emptyList();
+ updateContext = performUpdate(verifier, update);
+ }
+ stat.update(clashes, prevUpdates.size());
+ prevUpdatesByVerifierId.get(verifier.getId()).add(updateContext.getUpdateWithTs());
+ return new TtlVerificationContext<>(updates.getKey(), verifier.getId(), prevUpdates, updateContext);
+ }
+
+ private List<ValueWithTs<?>> getPrevUpdates(String verifierId) throws Exception {
+ return StreamSupport
+ .stream(prevUpdatesByVerifierId.get(verifierId).get().spliterator(), false)
+ .collect(Collectors.toList());
+ }
+
+ private TtlUpdateContext<?, ?> performUpdate(
+ TtlStateVerifier<?, ?> verifier, Object update) throws Exception {
+ State state = states.get(verifier.getId());
+ long timestampBeforeUpdate = System.currentTimeMillis();
+ Object valueBeforeUpdate = verifier.get(state);
+ verifier.update(state, update);
+ Object updatedValue = verifier.get(state);
+ return new TtlUpdateContext<>(timestampBeforeUpdate,
+ valueBeforeUpdate, update, updatedValue, System.currentTimeMillis());
+ }
+
+ private boolean updateClashesWithPrevUpdates(ValueWithTs<?> update, List<ValueWithTs<?>> prevUpdates) {
+ return tooSlow(update) ||
+ (!prevUpdates.isEmpty() && prevUpdates.stream().anyMatch(pu -> updatesClash(pu, update)));
+ }
+
+ private boolean tooSlow(ValueWithTs<?> update) {
+ return update.getTimestampAfterUpdate() - update.getTimestampBeforeUpdate() >= ttl;
+ }
+
+ private boolean updatesClash(ValueWithTs<?> prevUpdate, ValueWithTs<?> nextUpdate) {
+ return prevUpdate.getTimestampAfterUpdate() + ttl >= nextUpdate.getTimestampBeforeUpdate() &&
+ prevUpdate.getTimestampBeforeUpdate() + ttl <= nextUpdate.getTimestampAfterUpdate();
+ }
+
+ private void resetState(String verifierId) {
+ states.get(verifierId).clear();
+ prevUpdatesByVerifierId.get(verifierId).clear();
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) {
+
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) {
+ states = TtlStateVerifier.VERIFIERS.stream()
+ .collect(Collectors.toMap(TtlStateVerifier::getId, v -> v.createState(context, ttlConfig)));
+ prevUpdatesByVerifierId = TtlStateVerifier.VERIFIERS.stream()
+ .collect(Collectors.toMap(TtlStateVerifier::getId, v -> {
+ Preconditions.checkNotNull(v);
+ TypeSerializer<ValueWithTs<?>> typeSerializer = new ValueWithTs.Serializer(v.getUpdateSerializer());
+ ListStateDescriptor<ValueWithTs<?>> stateDesc = new ListStateDescriptor<>(
+ "TtlPrevValueState_" + v.getId(), typeSerializer);
+ KeyedStateStore store = context.getKeyedStateStore();
+ return store.getListState(stateDesc);
+ }));
+ }
+
+ private static class UpdateStat implements Serializable {
+ final long reportStatAfterUpdatesNum;
+ long updates = 0;
+ long clashes = 0;
+ long prevUpdatesNum = 0;
+
+ UpdateStat(long reportStatAfterUpdatesNum) {
+ this.reportStatAfterUpdatesNum = reportStatAfterUpdatesNum;
+ }
+
+ void update(boolean clash, long prevUpdatesSize) {
+ updates++;
+ if (clash) {
+ clashes++;
+ }
+ prevUpdatesNum += prevUpdatesSize;
+ if (updates % reportStatAfterUpdatesNum == 0) {
+ LOG.info(String.format("Avg update chain length: %d, clash stat: %d/%d",
+ prevUpdatesNum / updates, clashes, updates));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/01cf808e/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/AbstractTtlStateVerifier.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/AbstractTtlStateVerifier.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/AbstractTtlStateVerifier.java
new file mode 100644
index 0000000..c56ff19
--- /dev/null
+++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/AbstractTtlStateVerifier.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.verify;
+
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.StateTtlConfiguration;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.util.StringUtils;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+
+/** Base class for State TTL verifiers. */
+abstract class AbstractTtlStateVerifier<D extends StateDescriptor<S, SV>, S extends State, SV, UV, GV>
+ implements TtlStateVerifier<UV, GV> {
+ static final Random RANDOM = new Random();
+
+ @Nonnull
+ final D stateDesc;
+
+ AbstractTtlStateVerifier(@Nonnull D stateDesc) {
+ this.stateDesc = stateDesc;
+ }
+
+ @Nonnull
+ static String randomString() {
+ return StringUtils.getRandomString(RANDOM, 2, 20);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ @Nonnull
+ public State createState(@Nonnull FunctionInitializationContext context, @Nonnull StateTtlConfiguration ttlConfig) {
+ stateDesc.enableTimeToLive(ttlConfig);
+ return createState(context);
+ }
+
+ abstract State createState(FunctionInitializationContext context);
+
+ @SuppressWarnings("unchecked")
+ @Override
+ @Nonnull
+ public TypeSerializer<UV> getUpdateSerializer() {
+ return (TypeSerializer<UV>) stateDesc.getSerializer();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public GV get(@Nonnull State state) throws Exception {
+ return getInternal((S) state);
+ }
+
+ abstract GV getInternal(@Nonnull S state) throws Exception;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void update(@Nonnull State state, Object update) throws Exception {
+ updateInternal((S) state, (UV) update);
+ }
+
+ abstract void updateInternal(@Nonnull S state, UV update) throws Exception;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public boolean verify(@Nonnull TtlVerificationContext<?, ?> verificationContextRaw) {
+ TtlVerificationContext<UV, GV> verificationContext = (TtlVerificationContext<UV, GV>) verificationContextRaw;
+ List<ValueWithTs<UV>> updates = new ArrayList<>(verificationContext.getPrevUpdates());
+ long currentTimestamp = verificationContext.getUpdateContext().getTimestampBeforeUpdate();
+ GV prevValue = expected(updates, currentTimestamp);
+ GV valueBeforeUpdate = verificationContext.getUpdateContext().getValueBeforeUpdate();
+ ValueWithTs<UV> update = verificationContext.getUpdateContext().getUpdateWithTs();
+ GV updatedValue = verificationContext.getUpdateContext().getUpdatedValue();
+ updates.add(update);
+ GV expectedValue = expected(updates, currentTimestamp);
+ return Objects.equals(valueBeforeUpdate, prevValue) && Objects.equals(updatedValue, expectedValue);
+ }
+
+ abstract GV expected(@Nonnull List<ValueWithTs<UV>> updates, long currentTimestamp);
+
+ boolean expired(long lastTimestamp, long currentTimestamp) {
+ return lastTimestamp + stateDesc.getTtlConfig().getTtl().toMilliseconds() <= currentTimestamp;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/01cf808e/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlAggregatingStateVerifier.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlAggregatingStateVerifier.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlAggregatingStateVerifier.java
new file mode 100644
index 0000000..960bbe7
--- /dev/null
+++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlAggregatingStateVerifier.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.verify;
+
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.state.AggregatingState;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+
+import javax.annotation.Nonnull;
+
+import java.util.List;
+
+class TtlAggregatingStateVerifier extends AbstractTtlStateVerifier<
+ AggregatingStateDescriptor<Integer, Long, String>, AggregatingState<Integer, String>, Long, Integer, String> {
+ TtlAggregatingStateVerifier() {
+ super(new AggregatingStateDescriptor<>("TtlAggregatingStateVerifier", AGG_FUNC, LongSerializer.INSTANCE));
+ }
+
+ @Override
+ @Nonnull
+ State createState(@Nonnull FunctionInitializationContext context) {
+ return context.getKeyedStateStore().getAggregatingState(stateDesc);
+ }
+
+ @Override
+ @Nonnull
+ public TypeSerializer<Integer> getUpdateSerializer() {
+ return IntSerializer.INSTANCE;
+ }
+
+ @Override
+ @Nonnull
+ public Integer generateRandomUpdate() {
+ return RANDOM.nextInt(100);
+ }
+
+ @Override
+ String getInternal(@Nonnull AggregatingState<Integer, String> state) throws Exception {
+ return state.get();
+ }
+
+ @Override
+ void updateInternal(@Nonnull AggregatingState<Integer, String> state, Integer update) throws Exception {
+ state.add(update);
+ }
+
+ @Override
+ String expected(@Nonnull List<ValueWithTs<Integer>> updates, long currentTimestamp) {
+ if (updates.isEmpty()) {
+ return null;
+ }
+ long acc = AGG_FUNC.createAccumulator();
+ long lastTs = updates.get(0).getTimestampAfterUpdate();
+ for (ValueWithTs<Integer> update : updates) {
+ if (expired(lastTs, update.getTimestampAfterUpdate())) {
+ acc = AGG_FUNC.createAccumulator();
+ }
+ acc = AGG_FUNC.add(update.getValue(), acc);
+ lastTs = update.getTimestampAfterUpdate();
+ }
+ return expired(lastTs, currentTimestamp) ? null : AGG_FUNC.getResult(acc);
+ }
+
+ private static final AggregateFunction<Integer, Long, String> AGG_FUNC =
+ new AggregateFunction<Integer, Long, String>() {
+ @Override
+ public Long createAccumulator() {
+ return 3L;
+ }
+
+ @Override
+ public Long add(Integer value, Long accumulator) {
+ return accumulator + value;
+ }
+
+ @Override
+ public String getResult(Long accumulator) {
+ return Long.toString(accumulator);
+ }
+
+ @Override
+ public Long merge(Long a, Long b) {
+ return a + b;
+ }
+ };
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/01cf808e/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlFoldingStateVerifier.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlFoldingStateVerifier.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlFoldingStateVerifier.java
new file mode 100644
index 0000000..c1cc761
--- /dev/null
+++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlFoldingStateVerifier.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.verify;
+
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+
+import javax.annotation.Nonnull;
+
+import java.util.List;
+
+@SuppressWarnings("deprecation")
+class TtlFoldingStateVerifier extends AbstractTtlStateVerifier<
+ FoldingStateDescriptor<Integer, Long>, FoldingState<Integer, Long>, Long, Integer, Long> {
+ private static final long INIT_VAL = 5L;
+
+ TtlFoldingStateVerifier() {
+ super(new FoldingStateDescriptor<>(
+ "TtlFoldingStateVerifier", INIT_VAL, (v, acc) -> acc + v, LongSerializer.INSTANCE));
+ }
+
+ @Override
+ @Nonnull
+ State createState(@Nonnull FunctionInitializationContext context) {
+ return context.getKeyedStateStore().getFoldingState(stateDesc);
+ }
+
+ @Override
+ @Nonnull
+ public TypeSerializer<Integer> getUpdateSerializer() {
+ return IntSerializer.INSTANCE;
+ }
+
+ @Override
+ @Nonnull
+ public Integer generateRandomUpdate() {
+ return RANDOM.nextInt(100);
+ }
+
+ @Override
+ Long getInternal(@Nonnull FoldingState<Integer, Long> state) throws Exception {
+ return state.get();
+ }
+
+ @Override
+ void updateInternal(@Nonnull FoldingState<Integer, Long> state, Integer update) throws Exception {
+ state.add(update);
+ }
+
+ @Override
+ Long expected(@Nonnull List<ValueWithTs<Integer>> updates, long currentTimestamp) {
+ if (updates.isEmpty()) {
+ return null;
+ }
+ long acc = INIT_VAL;
+ long lastTs = updates.get(0).getTimestampAfterUpdate();
+ for (ValueWithTs<Integer> update : updates) {
+ if (expired(lastTs, update.getTimestampAfterUpdate())) {
+ acc = INIT_VAL;
+ }
+ acc += update.getValue();
+ lastTs = update.getTimestampAfterUpdate();
+ }
+ return expired(lastTs, currentTimestamp) ? null : acc;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/01cf808e/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlListStateVerifier.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlListStateVerifier.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlListStateVerifier.java
new file mode 100644
index 0000000..b355aa9
--- /dev/null
+++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlListStateVerifier.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.verify;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+
+import javax.annotation.Nonnull;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+class TtlListStateVerifier extends AbstractTtlStateVerifier<
+ ListStateDescriptor<String>, ListState<String>, List<String>, String, List<String>> {
+ TtlListStateVerifier() {
+ super(new ListStateDescriptor<>("TtlListStateVerifier", StringSerializer.INSTANCE));
+ }
+
+ @Override
+ @Nonnull
+ State createState(@Nonnull FunctionInitializationContext context) {
+ return context.getKeyedStateStore().getListState(stateDesc);
+ }
+
+ @Override
+ @Nonnull
+ public TypeSerializer<String> getUpdateSerializer() {
+ return StringSerializer.INSTANCE;
+ }
+
+ @Override
+ @Nonnull
+ public String generateRandomUpdate() {
+ return randomString();
+ }
+
+ @Override
+ @Nonnull
+ List<String> getInternal(@Nonnull ListState<String> state) throws Exception {
+ return StreamSupport.stream(state.get().spliterator(), false)
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ void updateInternal(@Nonnull ListState<String> state, String update) throws Exception {
+ state.add(update);
+ }
+
+ @Override
+ @Nonnull
+ List<String> expected(@Nonnull List<ValueWithTs<String>> updates, long currentTimestamp) {
+ return updates.stream()
+ .filter(u -> !expired(u.getTimestampAfterUpdate(), currentTimestamp))
+ .map(ValueWithTs::getValue)
+ .collect(Collectors.toList());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/01cf808e/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlMapStateVerifier.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlMapStateVerifier.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlMapStateVerifier.java
new file mode 100644
index 0000000..a9d6b36
--- /dev/null
+++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlMapStateVerifier.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.verify;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.StreamSupport;
+
+class TtlMapStateVerifier extends AbstractTtlStateVerifier<
+ MapStateDescriptor<String, String>, MapState<String, String>,
+ Map<String, String>, Tuple2<String, String>, Map<String, String>> {
+ private static final List<String> KEYS = new ArrayList<>();
+ static {
+ IntStream.range(0, RANDOM.nextInt(5) + 5).forEach(i -> KEYS.add(randomString()));
+ }
+
+ TtlMapStateVerifier() {
+ super(new MapStateDescriptor<>("TtlMapStateVerifier", StringSerializer.INSTANCE, StringSerializer.INSTANCE));
+ }
+
+ @Override
+ @Nonnull
+ State createState(@Nonnull FunctionInitializationContext context) {
+ return context.getKeyedStateStore().getMapState(stateDesc);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ @Nonnull
+ public TypeSerializer<Tuple2<String, String>> getUpdateSerializer() {
+ return new TupleSerializer(
+ Tuple2.class, new TypeSerializer[] {StringSerializer.INSTANCE, StringSerializer.INSTANCE});
+ }
+
+ @Override
+ @Nonnull
+ public Tuple2<String, String> generateRandomUpdate() {
+ return Tuple2.of(KEYS.get(RANDOM.nextInt(KEYS.size())), randomString());
+ }
+
+ @Override
+ @Nonnull
+ Map<String, String> getInternal(@Nonnull MapState<String, String> state) throws Exception {
+ return StreamSupport.stream(state.entries().spliterator(), false)
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ }
+
+ @Override
+ void updateInternal(@Nonnull MapState<String, String> state, Tuple2<String, String> update) throws Exception {
+ state.put(update.f0, update.f1);
+ }
+
+ @Override
+ @Nonnull
+ Map<String, String> expected(@Nonnull List<ValueWithTs<Tuple2<String, String>>> updates, long currentTimestamp) {
+ return updates.stream()
+ .collect(Collectors.groupingBy(u -> u.getValue().f0))
+ .entrySet().stream()
+ .map(e -> e.getValue().get(e.getValue().size() - 1))
+ .filter(u -> !expired(u.getTimestampAfterUpdate(), currentTimestamp))
+ .map(ValueWithTs::getValue)
+ .collect(Collectors.toMap(u -> u.f0, u -> u.f1));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/01cf808e/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlReducingStateVerifier.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlReducingStateVerifier.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlReducingStateVerifier.java
new file mode 100644
index 0000000..773be05
--- /dev/null
+++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlReducingStateVerifier.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.verify;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+
+import javax.annotation.Nonnull;
+
+import java.util.List;
+
+class TtlReducingStateVerifier extends AbstractTtlStateVerifier<
+ ReducingStateDescriptor<Integer>, ReducingState<Integer>, Integer, Integer, Integer> {
+ TtlReducingStateVerifier() {
+ super(new ReducingStateDescriptor<>(
+ "TtlReducingStateVerifier",
+ (ReduceFunction<Integer>) (value1, value2) -> value1 + value2,
+ IntSerializer.INSTANCE));
+ }
+
+ @Override
+ @Nonnull
+ State createState(@Nonnull FunctionInitializationContext context) {
+ return context.getKeyedStateStore().getReducingState(stateDesc);
+ }
+
+ @Override
+ @Nonnull
+ public TypeSerializer<Integer> getUpdateSerializer() {
+ return IntSerializer.INSTANCE;
+ }
+
+ @Override
+ @Nonnull
+ public Integer generateRandomUpdate() {
+ return RANDOM.nextInt(100);
+ }
+
+ @Override
+ Integer getInternal(@Nonnull ReducingState<Integer> state) throws Exception {
+ return state.get();
+ }
+
+ @Override
+ void updateInternal(@Nonnull ReducingState<Integer> state, Integer update) throws Exception {
+ state.add(update);
+ }
+
+ @Override
+ Integer expected(@Nonnull List<ValueWithTs<Integer>> updates, long currentTimestamp) {
+ if (updates.isEmpty()) {
+ return null;
+ }
+ int acc = 0;
+ long lastTs = updates.get(0).getTimestampAfterUpdate();
+ for (ValueWithTs<Integer> update : updates) {
+ if (expired(lastTs, update.getTimestampAfterUpdate())) {
+ acc = 0;
+ }
+ acc += update.getValue();
+ lastTs = update.getTimestampAfterUpdate();
+ }
+ return expired(lastTs, currentTimestamp) ? null : acc;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/01cf808e/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlStateVerifier.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlStateVerifier.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlStateVerifier.java
new file mode 100644
index 0000000..e1c2e07
--- /dev/null
+++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlStateVerifier.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.verify;
+
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateTtlConfiguration;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+
+import javax.annotation.Nonnull;
+
+import java.util.Arrays;
+import java.util.List;
+
+/** TTL state verifier interface. */
+public interface TtlStateVerifier<UV, GV> {
+ List<TtlStateVerifier<?, ?>> VERIFIERS = Arrays.asList(
+ new TtlValueStateVerifier(),
+ new TtlListStateVerifier(),
+ new TtlMapStateVerifier(),
+ new TtlAggregatingStateVerifier(),
+ new TtlReducingStateVerifier(),
+ new TtlFoldingStateVerifier()
+ );
+
+ @Nonnull
+ default String getId() {
+ return this.getClass().getSimpleName();
+ }
+
+ @Nonnull
+ State createState(@Nonnull FunctionInitializationContext context, @Nonnull StateTtlConfiguration ttlConfig);
+
+ @Nonnull
+ TypeSerializer<UV> getUpdateSerializer();
+
+ UV generateRandomUpdate();
+
+ GV get(@Nonnull State state) throws Exception;
+
+ void update(@Nonnull State state, Object update) throws Exception;
+
+ boolean verify(@Nonnull TtlVerificationContext<?, ?> verificationContext);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/01cf808e/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlUpdateContext.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlUpdateContext.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlUpdateContext.java
new file mode 100644
index 0000000..959340b
--- /dev/null
+++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlUpdateContext.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.verify;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+
+/** Contains context relevant for state update with TTL. */
+public class TtlUpdateContext<UV, GV> implements Serializable {
+ private final long timestampBeforeUpdate;
+ private final GV valueBeforeUpdate;
+ private final UV update;
+ private final GV updatedValue;
+ private final long timestampAfterUpdate;
+
+ public TtlUpdateContext(
+ long timestampBeforeUpdate,
+ GV valueBeforeUpdate, UV update, GV updatedValue,
+ long timestampAfterUpdate) {
+ this.valueBeforeUpdate = valueBeforeUpdate;
+ this.update = update;
+ this.updatedValue = updatedValue;
+ this.timestampBeforeUpdate = timestampBeforeUpdate;
+ this.timestampAfterUpdate = timestampAfterUpdate;
+ }
+
+ long getTimestampBeforeUpdate() {
+ return timestampBeforeUpdate;
+ }
+
+ GV getValueBeforeUpdate() {
+ return valueBeforeUpdate;
+ }
+
+ @Nonnull
+ public ValueWithTs<UV> getUpdateWithTs() {
+ return new ValueWithTs<>(update, timestampBeforeUpdate, timestampAfterUpdate);
+ }
+
+ GV getUpdatedValue() {
+ return updatedValue;
+ }
+
+ @Override
+ public String toString() {
+ return "TtlUpdateContext{" +
+ "timestampBeforeUpdate=" + timestampBeforeUpdate +
+ ", valueBeforeUpdate=" + valueBeforeUpdate +
+ ", update=" + update +
+ ", updatedValue=" + updatedValue +
+ ", timestampAfterUpdate=" + timestampAfterUpdate +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/01cf808e/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlValueStateVerifier.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlValueStateVerifier.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlValueStateVerifier.java
new file mode 100644
index 0000000..fa4929b
--- /dev/null
+++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlValueStateVerifier.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.verify;
+
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+
+import javax.annotation.Nonnull;
+
+import java.util.List;
+
+class TtlValueStateVerifier
+ extends AbstractTtlStateVerifier<ValueStateDescriptor<String>, ValueState<String>, String, String, String> {
+ TtlValueStateVerifier() {
+ super(new ValueStateDescriptor<>("TtlValueStateVerifier", StringSerializer.INSTANCE));
+ }
+
+ @Override
+ @Nonnull
+ State createState(FunctionInitializationContext context) {
+ return context.getKeyedStateStore().getState(stateDesc);
+ }
+
+ @Nonnull
+ public String generateRandomUpdate() {
+ return randomString();
+ }
+
+ @Override
+ String getInternal(@Nonnull ValueState<String> state) throws Exception {
+ return state.value();
+ }
+
+ @Override
+ void updateInternal(@Nonnull ValueState<String> state, String update) throws Exception {
+ state.update(update);
+ }
+
+ @Override
+ String expected(@Nonnull List<ValueWithTs<String>> updates, long currentTimestamp) {
+ if (updates.isEmpty()) {
+ return null;
+ }
+ ValueWithTs<String> lastUpdate = updates.get(updates.size() - 1);
+ return expired(lastUpdate.getTimestampAfterUpdate(), currentTimestamp) ? null : lastUpdate.getValue();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/01cf808e/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlVerificationContext.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlVerificationContext.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlVerificationContext.java
new file mode 100644
index 0000000..4c985cd
--- /dev/null
+++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlVerificationContext.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.verify;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/** Data to verify state update with TTL. */
+public class TtlVerificationContext<UV, GV> implements Serializable {
+ private final int key;
+ @Nonnull
+ private final String verifierId;
+ @Nonnull
+ private final List<ValueWithTs<UV>> prevUpdates;
+ @Nonnull
+ private final TtlUpdateContext<UV, GV> updateContext;
+
+ @SuppressWarnings("unchecked")
+ public TtlVerificationContext(
+ int key,
+ @Nonnull String verifierId,
+ @Nonnull List<ValueWithTs<?>> prevUpdates,
+ @Nonnull TtlUpdateContext<?, ?> updateContext) {
+ this.key = key;
+ this.verifierId = verifierId;
+ this.prevUpdates = new ArrayList<>();
+ prevUpdates.forEach(pu -> this.prevUpdates.add((ValueWithTs<UV>) pu));
+ this.updateContext = (TtlUpdateContext<UV, GV>) updateContext;
+ }
+
+ @Nonnull
+ List<ValueWithTs<UV>> getPrevUpdates() {
+ return prevUpdates;
+ }
+
+ @Nonnull
+ TtlUpdateContext<UV, GV> getUpdateContext() {
+ return updateContext;
+ }
+
+ @Override
+ public String toString() {
+ return "TtlVerificationContext{" +
+ "key=" + key +
+ ", verifierId='" + verifierId + '\'' +
+ ", prevUpdates=" + prevUpdates +
+ ", updateContext=" + updateContext +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/01cf808e/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java
new file mode 100644
index 0000000..9302377
--- /dev/null
+++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.verify;
+
+import org.apache.flink.api.common.typeutils.CompositeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+
+/** User state value with timestamps before and after update. */
+public class ValueWithTs<V> implements Serializable {
+ private final V value;
+ private final long timestampBeforeUpdate;
+ private final long timestampAfterUpdate;
+
+ public ValueWithTs(V value, long timestampBeforeUpdate, long timestampAfterUpdate) {
+ this.value = value;
+ this.timestampBeforeUpdate = timestampBeforeUpdate;
+ this.timestampAfterUpdate = timestampAfterUpdate;
+ }
+
+ V getValue() {
+ return value;
+ }
+
+ public long getTimestampBeforeUpdate() {
+ return timestampBeforeUpdate;
+ }
+
+ public long getTimestampAfterUpdate() {
+ return timestampAfterUpdate;
+ }
+
+ @Override
+ public String toString() {
+ return "ValueWithTs{" +
+ "value=" + value +
+ ", timestampBeforeUpdate=" + timestampBeforeUpdate +
+ ", timestampAfterUpdate=" + timestampAfterUpdate +
+ '}';
+ }
+
+ /** Serializer for Serializer. */
+ public static class Serializer extends CompositeSerializer<ValueWithTs<?>> {
+
+ public Serializer(TypeSerializer<?> userValueSerializer) {
+ super(true, userValueSerializer, LongSerializer.INSTANCE, LongSerializer.INSTANCE);
+ }
+
+ @SuppressWarnings("unchecked")
+ Serializer(PrecomputedParameters precomputed, TypeSerializer<?>... fieldSerializers) {
+ super(precomputed, fieldSerializers);
+ }
+
+ @Override
+ public ValueWithTs<?> createInstance(@Nonnull Object ... values) {
+ return new ValueWithTs<>(values[0], (Long) values[1], (Long) values[2]);
+ }
+
+ @Override
+ protected void setField(@Nonnull ValueWithTs<?> value, int index, Object fieldValue) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected Object getField(@Nonnull ValueWithTs<?> value, int index) {
+ switch (index) {
+ case 0:
+ return value.getValue();
+ case 1:
+ return value.getTimestampBeforeUpdate();
+ case 2:
+ return value.getTimestampAfterUpdate();
+ default:
+ throw new FlinkRuntimeException("Unexpected field index for ValueWithTs");
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected CompositeSerializer<ValueWithTs<?>> createSerializerInstance(
+ PrecomputedParameters precomputed,
+ TypeSerializer<?>... originalSerializers) {
+ return new Serializer(precomputed, (TypeSerializer<Object>) originalSerializers[0]);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/01cf808e/flink-end-to-end-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index 86dd3db..4abf595 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -50,6 +50,7 @@ under the License.
<module>flink-elasticsearch5-test</module>
<module>flink-quickstart-test</module>
<module>flink-confluent-schema-registry</module>
+ <module>flink-stream-state-ttl-test</module>
</modules>
<build>
http://git-wip-us.apache.org/repos/asf/flink/blob/01cf808e/flink-end-to-end-tests/run-nightly-tests.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index 15c73d5..dc8424f 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -102,5 +102,8 @@ run_test "Quickstarts Scala nightly end-to-end test" "$END_TO_END_DIR/test-scrip
run_test "Avro Confluent Schema Registry nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_confluent_schema_registry.sh"
+run_test "State TTL Heap backend end-to-end test" "$END_TO_END_DIR/test-scripts/test_stream_state_ttl.sh file"
+run_test "State TTL RocksDb backend end-to-end test" "$END_TO_END_DIR/test-scripts/test_stream_state_ttl.sh rocks"
+
printf "\n[PASS] All tests passed\n"
exit 0
http://git-wip-us.apache.org/repos/asf/flink/blob/01cf808e/flink-end-to-end-tests/test-scripts/common.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh
index c78afe7..f4563cc 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -240,6 +240,15 @@ function start_cluster {
done
}
+function start_taskmanagers {
+ tmnum=$1
+ echo "Start ${tmnum} more task managers"
+ for (( c=0; c<tmnum; c++ ))
+ do
+ $FLINK_DIR/bin/taskmanager.sh start
+ done
+}
+
function start_and_wait_for_tm {
tm_query_result=$(curl -s "http://localhost:8081/taskmanagers")
@@ -456,18 +465,17 @@ function s3_delete {
https://${bucket}.s3.amazonaws.com/${s3_file}
}
-# This function starts the given number of task managers and monitors their processes. If a task manager process goes
-# away a replacement is started.
+# This function starts the given number of task managers and monitors their processes.
+# If a task manager process goes away a replacement is started.
function tm_watchdog {
local expectedTm=$1
while true;
do
runningTm=`jps | grep -Eo 'TaskManagerRunner|TaskManager' | wc -l`;
count=$((expectedTm-runningTm))
- for (( c=0; c<count; c++ ))
- do
- $FLINK_DIR/bin/taskmanager.sh start > /dev/null
- done
+ if (( count != 0 )); then
+ start_taskmanagers ${count} > /dev/null
+ fi
sleep 5;
done
}
@@ -508,7 +516,8 @@ function rollback_flink_slf4j_metric_reporter() {
function get_metric_processed_records {
OPERATOR=$1
- N=$(grep ".General purpose test job.$OPERATOR.numRecordsIn:" $FLINK_DIR/log/*taskexecutor*.log | sed 's/.* //g' | tail -1)
+ JOB_NAME="${2:-General purpose test job}"
+ N=$(grep ".${JOB_NAME}.$OPERATOR.numRecordsIn:" $FLINK_DIR/log/*taskexecutor*.log | sed 's/.* //g' | tail -1)
if [ -z $N ]; then
N=0
fi
@@ -517,7 +526,8 @@ function get_metric_processed_records {
function get_num_metric_samples {
OPERATOR=$1
- N=$(grep ".General purpose test job.$OPERATOR.numRecordsIn:" $FLINK_DIR/log/*taskexecutor*.log | wc -l)
+ JOB_NAME="${2:-General purpose test job}"
+ N=$(grep ".${JOB_NAME}.$OPERATOR.numRecordsIn:" $FLINK_DIR/log/*taskexecutor*.log | wc -l)
if [ -z $N ]; then
N=0
fi
@@ -527,13 +537,14 @@ function get_num_metric_samples {
function wait_oper_metric_num_in_records {
OPERATOR=$1
MAX_NUM_METRICS="${2:-200}"
- NUM_METRICS=$(get_num_metric_samples ${OPERATOR})
- OLD_NUM_METRICS=${3:-${NUM_METRICS}}
+ JOB_NAME="${3:-General purpose test job}"
+ NUM_METRICS=$(get_num_metric_samples ${OPERATOR} '${JOB_NAME}')
+ OLD_NUM_METRICS=${4:-${NUM_METRICS}}
# monitor the numRecordsIn metric of the state machine operator in the second execution
# we let the test finish once the second restore execution has processed 200 records
while : ; do
- NUM_METRICS=$(get_num_metric_samples ${OPERATOR})
- NUM_RECORDS=$(get_metric_processed_records ${OPERATOR})
+ NUM_METRICS=$(get_num_metric_samples ${OPERATOR} "${JOB_NAME}")
+ NUM_RECORDS=$(get_metric_processed_records ${OPERATOR} "${JOB_NAME}")
# only account for metrics that appeared in the second execution
if (( $OLD_NUM_METRICS >= $NUM_METRICS )) ; then
@@ -541,7 +552,7 @@ function wait_oper_metric_num_in_records {
fi
if (( $NUM_RECORDS < $MAX_NUM_METRICS )); then
- echo "Waiting for job to process up to 200 records, current progress: $NUM_RECORDS records ..."
+ echo "Waiting for job to process up to ${MAX_NUM_METRICS} records, current progress: ${NUM_RECORDS} records ..."
sleep 1
else
break
http://git-wip-us.apache.org/repos/asf/flink/blob/01cf808e/flink-end-to-end-tests/test-scripts/test_stream_state_ttl.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_stream_state_ttl.sh b/flink-end-to-end-tests/test-scripts/test_stream_state_ttl.sh
new file mode 100755
index 0000000..fb911f3
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_stream_state_ttl.sh
@@ -0,0 +1,83 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+
+STATE_BACKEND_TYPE="${1:-file}"
+STATE_BACKEND_FILE_ASYNC="${2:-false}"
+TTL="${3:-1000}"
+PRECISION="${4:-5}"
+PARALLELISM="${5-3}"
+UPDATE_NUM="${6-1000}"
+
+CHECKPOINT_DIR="file://$TEST_DATA_DIR/savepoint-e2e-test-chckpt-dir"
+
+TEST=flink-stream-state-ttl-test
+TEST_PROGRAM_NAME=DataStreamStateTTLTestProgram
+TEST_PROGRAM_JAR=${END_TO_END_DIR}/$TEST/target/$TEST_PROGRAM_NAME.jar
+
+setup_flink_slf4j_metric_reporter
+function test_cleanup {
+ # don't call ourselves again for another signal interruption
+ trap "exit -1" INT
+ # don't call ourselves again for normal exit
+ trap "" EXIT
+
+ # revert our modifications to the Flink distribution
+ rm ${FLINK_DIR}/lib/flink-metrics-slf4j-*.jar
+}
+trap test_cleanup INT
+trap test_cleanup EXIT
+
+start_cluster
+start_taskmanagers $PARALLELISM
+
+function job_id() {
+ CMD="${FLINK_DIR}/bin/flink run -d -p ${PARALLELISM} ${TEST_PROGRAM_JAR} \
+ --test.semantics exactly-once \
+ --environment.parallelism ${PARALLELISM} \
+ --state_backend ${STATE_BACKEND_TYPE} \
+ --state_ttl_verifier.ttl_milli ${TTL} \
+ --state_ttl_verifier.precision_milli ${PRECISION} \
+ --state_backend.checkpoint_directory ${CHECKPOINT_DIR} \
+ --state_backend.file.async ${STATE_BACKEND_FILE_ASYNC} \
+ --update_generator_source.sleep_time 10 \
+ --update_generator_source.sleep_after_elements 1"
+ echo "${CMD}"
+}
+
+JOB_CMD=$(job_id)
+echo ${JOB_CMD}
+JOB=$(${JOB_CMD} | grep 'Job has been submitted with JobID' | sed 's/.* //g')
+wait_job_running ${JOB}
+wait_oper_metric_num_in_records TtlVerifyUpdateFunction.0 ${UPDATE_NUM} 'State TTL test job'
+
+SAVEPOINT_PATH=$(take_savepoint ${JOB} ${TEST_DATA_DIR} \
+ | grep "Savepoint completed. Path:" | sed 's/.* //g')
+
+cancel_job ${JOB}
+
+JOB_CMD=$(job_id)
+echo ${JOB_CMD}
+JOB=$(${JOB_CMD} | grep 'Job has been submitted with JobID' | sed 's/.* //g')
+wait_job_running ${JOB}
+wait_oper_metric_num_in_records TtlVerifyUpdateFunction.0 ${UPDATE_NUM} "State TTL test job"
+
+# if verification fails job produces failed TTL'ed state updates,
+# output would be non-empty and the test will not pass