You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2019/05/13 08:29:53 UTC
[flink] branch release-1.6 updated: [FLINK-12296][StateBackend] Fix
local state directory collision with state loss for chained keyed operators
This is an automated email from the ASF dual-hosted git repository.
srichter pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.6 by this push:
new 0dda6fe [FLINK-12296][StateBackend] Fix local state directory collision with state loss for chained keyed operators
0dda6fe is described below
commit 0dda6fe9dff4f667b110cda39bfe9738ba615b24
Author: Congxian Qiu <qc...@gmail.com>
AuthorDate: Mon May 13 16:29:34 2019 +0800
[FLINK-12296][StateBackend] Fix local state directory collision with state loss for chained keyed operators
This closes #8338.
---
.../streaming/state/RocksDBKeyedStateBackend.java | 16 +-
.../tasks/OneInputStreamTaskTestHarness.java | 43 +++-
.../runtime/tasks/StreamConfigChainer.java | 23 +-
.../runtime/tasks/StreamMockEnvironment.java | 8 +-
.../runtime/tasks/StreamTaskTestHarness.java | 21 +-
.../state/StatefulOperatorChainedTaskTest.java | 259 +++++++++++++++++++++
6 files changed, 357 insertions(+), 13 deletions(-)
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 17ba985..62c540f 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -270,6 +270,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
/** Shared wrapper for batch writes to the RocksDB instance. */
private RocksDBWriteBatchWrapper writeBatchWrapper;
+ /** The local directory name of the current snapshot strategy. */
+ private final String localDirectoryName;
+
public RocksDBKeyedStateBackend(
String operatorIdentifier,
ClassLoader userCodeClassLoader,
@@ -319,6 +322,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
this.restoredKvStateMetaInfos = new HashMap<>();
this.materializedSstFiles = new TreeMap<>();
this.backendUID = UUID.randomUUID();
+ this.localDirectoryName = this.backendUID.toString().replaceAll("[\\-]", "");
this.snapshotStrategy = enableIncrementalCheckpointing ?
new IncrementalSnapshotStrategy() :
@@ -1977,17 +1981,17 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
LocalRecoveryDirectoryProvider directoryProvider = localRecoveryConfig.getLocalStateDirectoryProvider();
File directory = directoryProvider.subtaskSpecificCheckpointDirectory(checkpointId);
- if (directory.exists()) {
- FileUtils.deleteDirectory(directory);
- }
-
- if (!directory.mkdirs()) {
+ if (!directory.exists() && !directory.mkdirs()) {
throw new IOException("Local state base directory for checkpoint " + checkpointId +
" already exists: " + directory);
}
// introduces an extra directory because RocksDB wants a non-existing directory for native checkpoints.
- File rdbSnapshotDir = new File(directory, "rocks_db");
+ // append localDirectoryName here to solve directory collision problem when two stateful operators chained in one task.
+ File rdbSnapshotDir = new File(directory, localDirectoryName);
+ if (rdbSnapshotDir.exists()) {
+ FileUtils.deleteDirectory(rdbSnapshotDir);
+ }
Path path = new Path(rdbSnapshotDir.toURI());
// create a "permanent" snapshot directory because local recovery is active.
snapshotDirectory = SnapshotDirectory.permanent(path);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
index 89a4f81..0a7efda 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
@@ -25,7 +25,10 @@ import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate;
+import org.apache.flink.runtime.state.LocalRecoveryConfig;
+import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
+import java.io.File;
import java.io.IOException;
import java.util.function.Function;
@@ -56,16 +59,48 @@ public class OneInputStreamTaskTestHarness<IN, OUT> extends StreamTaskTestHarnes
/**
* Creates a test harness with the specified number of input gates and specified number
- * of channels per input gate.
+ * of channels per input gate and local recovery disabled.
+ */
+ public OneInputStreamTaskTestHarness(
+ Function<Environment, ? extends StreamTask<OUT, ?>> taskFactory,
+ int numInputGates,
+ int numInputChannelsPerGate,
+ TypeInformation<IN> inputType,
+ TypeInformation<OUT> outputType) {
+ this(taskFactory, numInputGates, numInputChannelsPerGate, inputType, outputType, TestLocalRecoveryConfig.disabled());
+ }
+
+ public OneInputStreamTaskTestHarness(
+ Function<Environment, ? extends StreamTask<OUT, ?>> taskFactory,
+ int numInputGates,
+ int numInputChannelsPerGate,
+ TypeInformation<IN> inputType,
+ TypeInformation<OUT> outputType,
+ File localRootDir) {
+ super(taskFactory, outputType, localRootDir);
+
+ this.inputType = inputType;
+ inputSerializer = inputType.createSerializer(executionConfig);
+
+ this.numInputGates = numInputGates;
+ this.numInputChannelsPerGate = numInputChannelsPerGate;
+
+ streamConfig.setStateKeySerializer(inputSerializer);
+ }
+
+ /**
+ * Creates a test harness with the specified number of input gates and specified number
+ * of channels per input gate and specified localRecoveryConfig.
*/
public OneInputStreamTaskTestHarness(
Function<Environment, ? extends StreamTask<OUT, ?>> taskFactory,
int numInputGates,
int numInputChannelsPerGate,
TypeInformation<IN> inputType,
- TypeInformation<OUT> outputType) {
+ TypeInformation<OUT> outputType,
+ LocalRecoveryConfig localRecoveryConfig) {
- super(taskFactory, outputType);
+ super(taskFactory, outputType, localRecoveryConfig);
this.inputType = inputType;
inputSerializer = inputType.createSerializer(executionConfig);
@@ -82,7 +117,7 @@ public class OneInputStreamTaskTestHarness<IN, OUT> extends StreamTaskTestHarnes
TypeInformation<IN> inputType,
TypeInformation<OUT> outputType) {
- this(taskFactory, 1, 1, inputType, outputType);
+ this(taskFactory, 1, 1, inputType, outputType, TestLocalRecoveryConfig.disabled());
}
@Override
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
index 10e50ce..e19e8d2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
@@ -67,11 +67,28 @@ public class StreamConfigChainer {
return chain(operatorID, operator, typeSerializer, typeSerializer);
}
+ public <T> StreamConfigChainer chain(
+ OperatorID operatorID,
+ OneInputStreamOperator<T, T> operator,
+ TypeSerializer<T> typeSerializer,
+ boolean createKeyedStateBackend) {
+ return chain(operatorID, operator, typeSerializer, typeSerializer, createKeyedStateBackend);
+ }
+
+ public <IN, OUT> StreamConfigChainer chain(
+ OperatorID operatorID,
+ OneInputStreamOperator<IN, OUT> operator,
+ TypeSerializer<IN> inputSerializer,
+ TypeSerializer<OUT> outputSerializer) {
+ return chain(operatorID, operator, inputSerializer, outputSerializer, false);
+ }
+
public <IN, OUT> StreamConfigChainer chain(
OperatorID operatorID,
OneInputStreamOperator<IN, OUT> operator,
TypeSerializer<IN> inputSerializer,
- TypeSerializer<OUT> outputSerializer) {
+ TypeSerializer<OUT> outputSerializer,
+ boolean createKeyedStateBackend) {
chainIndex++;
tailConfig.setChainedOutputs(Collections.singletonList(
@@ -88,6 +105,10 @@ public class StreamConfigChainer {
tailConfig.setTypeSerializerIn1(inputSerializer);
tailConfig.setTypeSerializerOut(outputSerializer);
tailConfig.setChainIndex(chainIndex);
+ if (createKeyedStateBackend) {
+ // used to test multiple stateful operators chained in a single task.
+ tailConfig.setStateKeySerializer(inputSerializer);
+ }
chainedConfigs.put(chainIndex, tailConfig);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 32de8d5..8f191b9 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -102,6 +102,8 @@ public class StreamMockEnvironment implements Environment {
private TaskEventDispatcher taskEventDispatcher = mock(TaskEventDispatcher.class);
+ private TaskManagerRuntimeInfo taskManagerRuntimeInfo = new TestingTaskManagerRuntimeInfo();
+
public StreamMockEnvironment(
Configuration jobConfig,
Configuration taskConfig,
@@ -322,7 +324,11 @@ public class StreamMockEnvironment implements Environment {
@Override
public TaskManagerRuntimeInfo getTaskManagerInfo() {
- return new TestingTaskManagerRuntimeInfo();
+ return this.taskManagerRuntimeInfo;
+ }
+
+ public void setTaskManagerInfo(TaskManagerRuntimeInfo taskManagerRuntimeInfo) {
+ this.taskManagerRuntimeInfo = taskManagerRuntimeInfo;
}
@Override
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index b2f1b99..f46f91b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -19,6 +19,7 @@
package org.apache.flink.streaming.runtime.tasks;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
@@ -26,10 +27,14 @@ import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.LocalRecoveryConfig;
+import org.apache.flink.runtime.state.LocalRecoveryDirectoryProviderImpl;
+import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
@@ -47,6 +52,7 @@ import org.apache.flink.util.Preconditions;
import org.junit.Assert;
+import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.LinkedList;
@@ -109,7 +115,20 @@ public class StreamTaskTestHarness<OUT> {
public StreamTaskTestHarness(
Function<Environment, ? extends StreamTask<OUT, ?>> taskFactory,
TypeInformation<OUT> outputType) {
+ this(taskFactory, outputType, TestLocalRecoveryConfig.disabled());
+ }
+ public StreamTaskTestHarness(
+ Function<Environment, ? extends StreamTask<OUT, ?>> taskFactory,
+ TypeInformation<OUT> outputType,
+ File localRootDir) {
+ this(taskFactory, outputType, new LocalRecoveryConfig(true, new LocalRecoveryDirectoryProviderImpl(localRootDir, new JobID(), new JobVertexID(), 0)));
+ }
+
+ public StreamTaskTestHarness(
+ Function<Environment, ? extends StreamTask<OUT, ?>> taskFactory,
+ TypeInformation<OUT> outputType,
+ LocalRecoveryConfig localRecoveryConfig) {
this.taskFactory = checkNotNull(taskFactory);
this.memorySize = DEFAULT_MEMORY_MANAGER_SIZE;
this.bufferSize = DEFAULT_NETWORK_BUFFER_SIZE;
@@ -123,7 +142,7 @@ public class StreamTaskTestHarness<OUT> {
outputSerializer = outputType.createSerializer(executionConfig);
outputStreamRecordSerializer = new StreamElementSerializer<OUT>(outputSerializer);
- this.taskStateManager = new TestTaskStateManager();
+ this.taskStateManager = new TestTaskStateManager(localRecoveryConfig);
}
public ProcessingTimeService getProcessingTimeService() {
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/StatefulOperatorChainedTaskTest.java b/flink-tests/src/test/java/org/apache/flink/test/state/StatefulOperatorChainedTaskTest.java
new file mode 100644
index 0000000..d36b5e2
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/StatefulOperatorChainedTaskTest.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.state;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.TestTaskStateManager;
+import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
+import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.apache.flink.configuration.CheckpointingOptions.CHECKPOINTS_DIRECTORY;
+import static org.apache.flink.configuration.CheckpointingOptions.INCREMENTAL_CHECKPOINTS;
+import static org.apache.flink.configuration.CheckpointingOptions.STATE_BACKEND;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test for StatefulOperatorChainedTaskTest.
+ */
+public class StatefulOperatorChainedTaskTest {
+
+ private static final Set<OperatorID> RESTORED_OPERATORS = ConcurrentHashMap.newKeySet();
+ private TemporaryFolder temporaryFolder;
+
+ @Before
+ public void setup() throws IOException {
+ RESTORED_OPERATORS.clear();
+ temporaryFolder = new TemporaryFolder();
+ temporaryFolder.create();
+ }
+
+ @Test
+ public void testMultipleStatefulOperatorChainedSnapshotAndRestore() throws Exception {
+
+ OperatorID headOperatorID = new OperatorID(42L, 42L);
+ OperatorID tailOperatorID = new OperatorID(44L, 44L);
+
+ JobManagerTaskRestore restore = createRunAndCheckpointOperatorChain(
+ headOperatorID,
+ new CounterOperator("head"),
+ tailOperatorID,
+ new CounterOperator("tail"),
+ Optional.empty());
+
+ TaskStateSnapshot stateHandles = restore.getTaskStateSnapshot();
+
+ assertEquals(2, stateHandles.getSubtaskStateMappings().size());
+
+ createRunAndCheckpointOperatorChain(
+ headOperatorID,
+ new CounterOperator("head"),
+ tailOperatorID,
+ new CounterOperator("tail"),
+ Optional.of(restore));
+
+ assertEquals(new HashSet<>(Arrays.asList(headOperatorID, tailOperatorID)), RESTORED_OPERATORS);
+ }
+
+ private JobManagerTaskRestore createRunAndCheckpointOperatorChain(
+ OperatorID headId,
+ OneInputStreamOperator<String, String> headOperator,
+ OperatorID tailId,
+ OneInputStreamOperator<String, String> tailOperator,
+ Optional<JobManagerTaskRestore> restore) throws Exception {
+
+ File localRootDir = temporaryFolder.newFolder();
+ final OneInputStreamTaskTestHarness<String, String> testHarness =
+ new OneInputStreamTaskTestHarness<>(
+ OneInputStreamTask::new,
+ 1, 1,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ localRootDir);
+
+ testHarness.setupOperatorChain(headId, headOperator)
+ .chain(tailId, tailOperator, StringSerializer.INSTANCE, true)
+ .finish();
+
+ if (restore.isPresent()) {
+ JobManagerTaskRestore taskRestore = restore.get();
+ testHarness.setTaskStateSnapshot(
+ taskRestore.getRestoreCheckpointId(),
+ taskRestore.getTaskStateSnapshot());
+ }
+
+ StreamMockEnvironment environment = new StreamMockEnvironment(
+ testHarness.jobConfig,
+ testHarness.taskConfig,
+ testHarness.getExecutionConfig(),
+ testHarness.memorySize,
+ new MockInputSplitProvider(),
+ testHarness.bufferSize,
+ testHarness.getTaskStateManager());
+
+ Configuration configuration = new Configuration();
+ configuration.setString(STATE_BACKEND.key(), "rocksdb");
+ File file = temporaryFolder.newFolder();
+ configuration.setString(CHECKPOINTS_DIRECTORY.key(), file.toURI().toString());
+ configuration.setString(INCREMENTAL_CHECKPOINTS.key(), "true");
+ environment.setTaskManagerInfo(
+ new TestingTaskManagerRuntimeInfo(
+ configuration,
+ System.getProperty("java.io.tmpdir").split(",|" + File.pathSeparator)));
+ testHarness.invoke(environment);
+ testHarness.waitForTaskRunning();
+
+ OneInputStreamTask<String, String> streamTask = testHarness.getTask();
+
+ processRecords(testHarness);
+ triggerCheckpoint(testHarness, streamTask);
+
+ TestTaskStateManager taskStateManager = testHarness.getTaskStateManager();
+
+ JobManagerTaskRestore jobManagerTaskRestore = new JobManagerTaskRestore(
+ taskStateManager.getReportedCheckpointId(),
+ taskStateManager.getLastJobManagerTaskStateSnapshot());
+
+ testHarness.endInput();
+ testHarness.waitForTaskCompletion();
+ return jobManagerTaskRestore;
+ }
+
+ private void triggerCheckpoint(
+ OneInputStreamTaskTestHarness<String, String> testHarness,
+ OneInputStreamTask<String, String> streamTask) throws Exception {
+
+ long checkpointId = 1L;
+ CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 1L);
+
+ testHarness.getTaskStateManager().setWaitForReportLatch(new OneShotLatch());
+
+ while (!streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forCheckpointWithDefaultLocation())) {}
+
+ testHarness.getTaskStateManager().getWaitForReportLatch().await();
+ long reportedCheckpointId = testHarness.getTaskStateManager().getReportedCheckpointId();
+
+ assertEquals(checkpointId, reportedCheckpointId);
+ }
+
+ private void processRecords(OneInputStreamTaskTestHarness<String, String> testHarness) throws Exception {
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+ testHarness.processElement(new StreamRecord<>("10"), 0, 0);
+ testHarness.processElement(new StreamRecord<>("20"), 0, 0);
+ testHarness.processElement(new StreamRecord<>("30"), 0, 0);
+
+ testHarness.waitForInputProcessing();
+
+ expectedOutput.add(new StreamRecord<>("10"));
+ expectedOutput.add(new StreamRecord<>("20"));
+ expectedOutput.add(new StreamRecord<>("30"));
+ TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+ }
+
+ private abstract static class RestoreWatchOperator<IN, OUT>
+ extends AbstractStreamOperator<OUT>
+ implements OneInputStreamOperator<IN, OUT> {
+
+ @Override
+ public void initializeState(StateInitializationContext context) throws Exception {
+ if (context.isRestored()) {
+ RESTORED_OPERATORS.add(getOperatorID());
+ }
+ }
+ }
+
+ /**
+ * Operator that counts processed messages and keeps result on state.
+ */
+ private static class CounterOperator extends RestoreWatchOperator<String, String> {
+ private static final long serialVersionUID = 2048954179291813243L;
+
+ private static long snapshotOutData = 0L;
+ private ValueState<Long> counterState;
+ private long counter = 0;
+ private String prefix;
+
+ CounterOperator(String prefix) {
+ this.prefix = prefix;
+ }
+
+ @Override
+ public void processElement(StreamRecord<String> element) throws Exception {
+ counter++;
+ output.collect(element);
+ }
+
+ @Override
+ public void initializeState(StateInitializationContext context) throws Exception {
+ super.initializeState(context);
+
+ counterState = context
+ .getKeyedStateStore()
+ .getState(new ValueStateDescriptor<>(prefix + "counter-state", LongSerializer.INSTANCE));
+
+ // set key manually to make RocksDBListState get the serialized key.
+ setCurrentKey("10");
+
+ if (context.isRestored()) {
+ counter = counterState.value();
+ assertEquals(snapshotOutData, counter);
+ counterState.clear();
+ }
+ }
+
+ @Override
+ public void snapshotState(StateSnapshotContext context) throws Exception {
+ counterState.update(counter);
+ snapshotOutData = counter;
+ }
+ }
+}