You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2019/05/13 08:29:53 UTC

[flink] branch release-1.6 updated: [FLINK-12296][StateBackend] Fix local state directory collision with state loss for chained keyed operators

This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.6 by this push:
     new 0dda6fe  [FLINK-12296][StateBackend] Fix local state directory collision with state loss for chained keyed operators
0dda6fe is described below

commit 0dda6fe9dff4f667b110cda39bfe9738ba615b24
Author: Congxian Qiu <qc...@gmail.com>
AuthorDate: Mon May 13 16:29:34 2019 +0800

    [FLINK-12296][StateBackend] Fix local state directory collision with state loss for chained keyed operators
    
    This closes #8338.
---
 .../streaming/state/RocksDBKeyedStateBackend.java  |  16 +-
 .../tasks/OneInputStreamTaskTestHarness.java       |  43 +++-
 .../runtime/tasks/StreamConfigChainer.java         |  23 +-
 .../runtime/tasks/StreamMockEnvironment.java       |   8 +-
 .../runtime/tasks/StreamTaskTestHarness.java       |  21 +-
 .../state/StatefulOperatorChainedTaskTest.java     | 259 +++++++++++++++++++++
 6 files changed, 357 insertions(+), 13 deletions(-)

diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 17ba985..62c540f 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -270,6 +270,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	/** Shared wrapper for batch writes to the RocksDB instance. */
 	private RocksDBWriteBatchWrapper writeBatchWrapper;
 
+	/** The local directory name of the current snapshot strategy. */
+	private final String localDirectoryName;
+
 	public RocksDBKeyedStateBackend(
 		String operatorIdentifier,
 		ClassLoader userCodeClassLoader,
@@ -319,6 +322,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		this.restoredKvStateMetaInfos = new HashMap<>();
 		this.materializedSstFiles = new TreeMap<>();
 		this.backendUID = UUID.randomUUID();
+		this.localDirectoryName = this.backendUID.toString().replaceAll("[\\-]", "");
 
 		this.snapshotStrategy = enableIncrementalCheckpointing ?
 			new IncrementalSnapshotStrategy() :
@@ -1977,17 +1981,17 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				LocalRecoveryDirectoryProvider directoryProvider = localRecoveryConfig.getLocalStateDirectoryProvider();
 				File directory = directoryProvider.subtaskSpecificCheckpointDirectory(checkpointId);
 
-				if (directory.exists()) {
-					FileUtils.deleteDirectory(directory);
-				}
-
-				if (!directory.mkdirs()) {
+				if (!directory.exists() && !directory.mkdirs()) {
 					throw new IOException("Local state base directory for checkpoint " + checkpointId +
 						" already exists: " + directory);
 				}
 
 				// introduces an extra directory because RocksDB wants a non-existing directory for native checkpoints.
-				File rdbSnapshotDir = new File(directory, "rocks_db");
+				// append localDirectoryName here to solve directory collision problem when two stateful operators chained in one task.
+				File rdbSnapshotDir = new File(directory, localDirectoryName);
+				if (rdbSnapshotDir.exists()) {
+					FileUtils.deleteDirectory(rdbSnapshotDir);
+				}
 				Path path = new Path(rdbSnapshotDir.toURI());
 				// create a "permanent" snapshot directory because local recovery is active.
 				snapshotDirectory = SnapshotDirectory.permanent(path);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
index 89a4f81..0a7efda 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
@@ -25,7 +25,10 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate;
+import org.apache.flink.runtime.state.LocalRecoveryConfig;
+import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.function.Function;
 
@@ -56,16 +59,48 @@ public class OneInputStreamTaskTestHarness<IN, OUT> extends StreamTaskTestHarnes
 
 	/**
 	 * Creates a test harness with the specified number of input gates and specified number
-	 * of channels per input gate.
+	 * of channels per input gate and local recovery disabled.
+	 */
+	public OneInputStreamTaskTestHarness(
+		Function<Environment, ? extends StreamTask<OUT, ?>> taskFactory,
+		int numInputGates,
+		int numInputChannelsPerGate,
+		TypeInformation<IN> inputType,
+		TypeInformation<OUT> outputType) {
+		this(taskFactory, numInputGates, numInputChannelsPerGate, inputType, outputType, TestLocalRecoveryConfig.disabled());
+	}
+
+	public OneInputStreamTaskTestHarness(
+		Function<Environment, ? extends StreamTask<OUT, ?>> taskFactory,
+		int numInputGates,
+		int numInputChannelsPerGate,
+		TypeInformation<IN> inputType,
+		TypeInformation<OUT> outputType,
+		File localRootDir) {
+		super(taskFactory, outputType, localRootDir);
+
+		this.inputType = inputType;
+		inputSerializer = inputType.createSerializer(executionConfig);
+
+		this.numInputGates = numInputGates;
+		this.numInputChannelsPerGate = numInputChannelsPerGate;
+
+		streamConfig.setStateKeySerializer(inputSerializer);
+	}
+
+	/**
+	 * Creates a test harness with the specified number of input gates and specified number
+	 * of channels per input gate and specified localRecoveryConfig.
 	 */
 	public OneInputStreamTaskTestHarness(
 			Function<Environment, ? extends StreamTask<OUT, ?>> taskFactory,
 			int numInputGates,
 			int numInputChannelsPerGate,
 			TypeInformation<IN> inputType,
-			TypeInformation<OUT> outputType) {
+			TypeInformation<OUT> outputType,
+			LocalRecoveryConfig localRecoveryConfig) {
 
-		super(taskFactory, outputType);
+		super(taskFactory, outputType, localRecoveryConfig);
 
 		this.inputType = inputType;
 		inputSerializer = inputType.createSerializer(executionConfig);
@@ -82,7 +117,7 @@ public class OneInputStreamTaskTestHarness<IN, OUT> extends StreamTaskTestHarnes
 			TypeInformation<IN> inputType,
 			TypeInformation<OUT> outputType) {
 
-		this(taskFactory, 1, 1, inputType, outputType);
+		this(taskFactory, 1, 1, inputType, outputType, TestLocalRecoveryConfig.disabled());
 	}
 
 	@Override
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
index 10e50ce..e19e8d2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
@@ -67,11 +67,28 @@ public class StreamConfigChainer {
 		return chain(operatorID, operator, typeSerializer, typeSerializer);
 	}
 
+	public <T> StreamConfigChainer chain(
+		OperatorID operatorID,
+		OneInputStreamOperator<T, T> operator,
+		TypeSerializer<T> typeSerializer,
+		boolean createKeyedStateBackend) {
+		return chain(operatorID, operator, typeSerializer, typeSerializer, createKeyedStateBackend);
+	}
+
+	public <IN, OUT> StreamConfigChainer chain(
+		OperatorID operatorID,
+		OneInputStreamOperator<IN, OUT> operator,
+		TypeSerializer<IN> inputSerializer,
+		TypeSerializer<OUT> outputSerializer) {
+		return chain(operatorID, operator, inputSerializer, outputSerializer, false);
+	}
+
 	public <IN, OUT> StreamConfigChainer chain(
 			OperatorID operatorID,
 			OneInputStreamOperator<IN, OUT> operator,
 			TypeSerializer<IN> inputSerializer,
-			TypeSerializer<OUT> outputSerializer) {
+			TypeSerializer<OUT> outputSerializer,
+			boolean createKeyedStateBackend) {
 		chainIndex++;
 
 		tailConfig.setChainedOutputs(Collections.singletonList(
@@ -88,6 +105,10 @@ public class StreamConfigChainer {
 		tailConfig.setTypeSerializerIn1(inputSerializer);
 		tailConfig.setTypeSerializerOut(outputSerializer);
 		tailConfig.setChainIndex(chainIndex);
+		if (createKeyedStateBackend) {
+			// used to test multiple stateful operators chained in a single task.
+			tailConfig.setStateKeySerializer(inputSerializer);
+		}
 
 		chainedConfigs.put(chainIndex, tailConfig);
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 32de8d5..8f191b9 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -102,6 +102,8 @@ public class StreamMockEnvironment implements Environment {
 
 	private TaskEventDispatcher taskEventDispatcher = mock(TaskEventDispatcher.class);
 
+	private TaskManagerRuntimeInfo taskManagerRuntimeInfo = new TestingTaskManagerRuntimeInfo();
+
 	public StreamMockEnvironment(
 		Configuration jobConfig,
 		Configuration taskConfig,
@@ -322,7 +324,11 @@ public class StreamMockEnvironment implements Environment {
 
 	@Override
 	public TaskManagerRuntimeInfo getTaskManagerInfo() {
-		return new TestingTaskManagerRuntimeInfo();
+		return this.taskManagerRuntimeInfo;
+	}
+
+	public void setTaskManagerInfo(TaskManagerRuntimeInfo taskManagerRuntimeInfo) {
+		this.taskManagerRuntimeInfo = taskManagerRuntimeInfo;
 	}
 
 	@Override
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index b2f1b99..f46f91b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
@@ -26,10 +27,14 @@ import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.LocalRecoveryConfig;
+import org.apache.flink.runtime.state.LocalRecoveryDirectoryProviderImpl;
+import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
 import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
@@ -47,6 +52,7 @@ import org.apache.flink.util.Preconditions;
 
 import org.junit.Assert;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.LinkedList;
@@ -109,7 +115,20 @@ public class StreamTaskTestHarness<OUT> {
 	public StreamTaskTestHarness(
 			Function<Environment, ? extends StreamTask<OUT, ?>> taskFactory,
 			TypeInformation<OUT> outputType) {
+		this(taskFactory, outputType, TestLocalRecoveryConfig.disabled());
+	}
 
+	public StreamTaskTestHarness(
+		Function<Environment, ? extends StreamTask<OUT, ?>> taskFactory,
+		TypeInformation<OUT> outputType,
+		File localRootDir) {
+		this(taskFactory, outputType, new LocalRecoveryConfig(true, new LocalRecoveryDirectoryProviderImpl(localRootDir, new JobID(), new JobVertexID(), 0)));
+	}
+
+	public StreamTaskTestHarness(
+		Function<Environment, ? extends StreamTask<OUT, ?>> taskFactory,
+		TypeInformation<OUT> outputType,
+		LocalRecoveryConfig localRecoveryConfig) {
 		this.taskFactory = checkNotNull(taskFactory);
 		this.memorySize = DEFAULT_MEMORY_MANAGER_SIZE;
 		this.bufferSize = DEFAULT_NETWORK_BUFFER_SIZE;
@@ -123,7 +142,7 @@ public class StreamTaskTestHarness<OUT> {
 		outputSerializer = outputType.createSerializer(executionConfig);
 		outputStreamRecordSerializer = new StreamElementSerializer<OUT>(outputSerializer);
 
-		this.taskStateManager = new TestTaskStateManager();
+		this.taskStateManager = new TestTaskStateManager(localRecoveryConfig);
 	}
 
 	public ProcessingTimeService getProcessingTimeService() {
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/StatefulOperatorChainedTaskTest.java b/flink-tests/src/test/java/org/apache/flink/test/state/StatefulOperatorChainedTaskTest.java
new file mode 100644
index 0000000..d36b5e2
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/StatefulOperatorChainedTaskTest.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.state;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.TestTaskStateManager;
+import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
+import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.apache.flink.configuration.CheckpointingOptions.CHECKPOINTS_DIRECTORY;
+import static org.apache.flink.configuration.CheckpointingOptions.INCREMENTAL_CHECKPOINTS;
+import static org.apache.flink.configuration.CheckpointingOptions.STATE_BACKEND;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test for StatefulOperatorChainedTaskTest.
+ */
+public class StatefulOperatorChainedTaskTest {
+
+	private static final Set<OperatorID> RESTORED_OPERATORS = ConcurrentHashMap.newKeySet();
+	private TemporaryFolder temporaryFolder;
+
+	@Before
+	public void setup() throws IOException {
+		RESTORED_OPERATORS.clear();
+		temporaryFolder = new TemporaryFolder();
+		temporaryFolder.create();
+	}
+
+	@Test
+	public void testMultipleStatefulOperatorChainedSnapshotAndRestore() throws Exception {
+
+		OperatorID headOperatorID = new OperatorID(42L, 42L);
+		OperatorID tailOperatorID = new OperatorID(44L, 44L);
+
+		JobManagerTaskRestore restore = createRunAndCheckpointOperatorChain(
+			headOperatorID,
+			new CounterOperator("head"),
+			tailOperatorID,
+			new CounterOperator("tail"),
+			Optional.empty());
+
+		TaskStateSnapshot stateHandles = restore.getTaskStateSnapshot();
+
+		assertEquals(2, stateHandles.getSubtaskStateMappings().size());
+
+		createRunAndCheckpointOperatorChain(
+			headOperatorID,
+			new CounterOperator("head"),
+			tailOperatorID,
+			new CounterOperator("tail"),
+			Optional.of(restore));
+
+		assertEquals(new HashSet<>(Arrays.asList(headOperatorID, tailOperatorID)), RESTORED_OPERATORS);
+	}
+
+	private JobManagerTaskRestore createRunAndCheckpointOperatorChain(
+		OperatorID headId,
+		OneInputStreamOperator<String, String> headOperator,
+		OperatorID tailId,
+		OneInputStreamOperator<String, String> tailOperator,
+		Optional<JobManagerTaskRestore> restore) throws Exception {
+
+		File localRootDir = temporaryFolder.newFolder();
+		final OneInputStreamTaskTestHarness<String, String> testHarness =
+			new OneInputStreamTaskTestHarness<>(
+				OneInputStreamTask::new,
+				1, 1,
+				BasicTypeInfo.STRING_TYPE_INFO,
+				BasicTypeInfo.STRING_TYPE_INFO,
+				localRootDir);
+
+		testHarness.setupOperatorChain(headId, headOperator)
+			.chain(tailId, tailOperator, StringSerializer.INSTANCE, true)
+			.finish();
+
+		if (restore.isPresent()) {
+			JobManagerTaskRestore taskRestore = restore.get();
+			testHarness.setTaskStateSnapshot(
+				taskRestore.getRestoreCheckpointId(),
+				taskRestore.getTaskStateSnapshot());
+		}
+
+		StreamMockEnvironment environment = new StreamMockEnvironment(
+			testHarness.jobConfig,
+			testHarness.taskConfig,
+			testHarness.getExecutionConfig(),
+			testHarness.memorySize,
+			new MockInputSplitProvider(),
+			testHarness.bufferSize,
+			testHarness.getTaskStateManager());
+
+		Configuration configuration = new Configuration();
+		configuration.setString(STATE_BACKEND.key(), "rocksdb");
+		File file = temporaryFolder.newFolder();
+		configuration.setString(CHECKPOINTS_DIRECTORY.key(), file.toURI().toString());
+		configuration.setString(INCREMENTAL_CHECKPOINTS.key(), "true");
+		environment.setTaskManagerInfo(
+			new TestingTaskManagerRuntimeInfo(
+				configuration,
+				System.getProperty("java.io.tmpdir").split(",|" + File.pathSeparator)));
+		testHarness.invoke(environment);
+		testHarness.waitForTaskRunning();
+
+		OneInputStreamTask<String, String> streamTask = testHarness.getTask();
+
+		processRecords(testHarness);
+		triggerCheckpoint(testHarness, streamTask);
+
+		TestTaskStateManager taskStateManager = testHarness.getTaskStateManager();
+
+		JobManagerTaskRestore jobManagerTaskRestore = new JobManagerTaskRestore(
+			taskStateManager.getReportedCheckpointId(),
+			taskStateManager.getLastJobManagerTaskStateSnapshot());
+
+		testHarness.endInput();
+		testHarness.waitForTaskCompletion();
+		return jobManagerTaskRestore;
+	}
+
+	private void triggerCheckpoint(
+		OneInputStreamTaskTestHarness<String, String> testHarness,
+		OneInputStreamTask<String, String> streamTask) throws Exception {
+
+		long checkpointId = 1L;
+		CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 1L);
+
+		testHarness.getTaskStateManager().setWaitForReportLatch(new OneShotLatch());
+
+		while (!streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forCheckpointWithDefaultLocation())) {}
+
+		testHarness.getTaskStateManager().getWaitForReportLatch().await();
+		long reportedCheckpointId = testHarness.getTaskStateManager().getReportedCheckpointId();
+
+		assertEquals(checkpointId, reportedCheckpointId);
+	}
+
+	private void processRecords(OneInputStreamTaskTestHarness<String, String> testHarness) throws Exception {
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		testHarness.processElement(new StreamRecord<>("10"), 0, 0);
+		testHarness.processElement(new StreamRecord<>("20"), 0, 0);
+		testHarness.processElement(new StreamRecord<>("30"), 0, 0);
+
+		testHarness.waitForInputProcessing();
+
+		expectedOutput.add(new StreamRecord<>("10"));
+		expectedOutput.add(new StreamRecord<>("20"));
+		expectedOutput.add(new StreamRecord<>("30"));
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+	}
+
+	private abstract static class RestoreWatchOperator<IN, OUT>
+		extends AbstractStreamOperator<OUT>
+		implements OneInputStreamOperator<IN, OUT> {
+
+		@Override
+		public void initializeState(StateInitializationContext context) throws Exception {
+			if (context.isRestored()) {
+				RESTORED_OPERATORS.add(getOperatorID());
+			}
+		}
+	}
+
+	/**
+	 * Operator that counts processed messages and keeps result on state.
+	 */
+	private static class CounterOperator extends RestoreWatchOperator<String, String> {
+		private static final long serialVersionUID = 2048954179291813243L;
+
+		private static long snapshotOutData = 0L;
+		private ValueState<Long> counterState;
+		private long counter = 0;
+		private String prefix;
+
+		CounterOperator(String prefix) {
+			this.prefix = prefix;
+		}
+
+		@Override
+		public void processElement(StreamRecord<String> element) throws Exception {
+			counter++;
+			output.collect(element);
+		}
+
+		@Override
+		public void initializeState(StateInitializationContext context) throws Exception {
+			super.initializeState(context);
+
+			counterState = context
+				.getKeyedStateStore()
+				.getState(new ValueStateDescriptor<>(prefix + "counter-state", LongSerializer.INSTANCE));
+
+			// set key manually to make RocksDBListState get the serialized key.
+			setCurrentKey("10");
+
+			if (context.isRestored()) {
+				counter =  counterState.value();
+				assertEquals(snapshotOutData, counter);
+				counterState.clear();
+			}
+		}
+
+		@Override
+		public void snapshotState(StateSnapshotContext context) throws Exception {
+			counterState.update(counter);
+			snapshotOutData = counter;
+		}
+	}
+}