You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/02 12:21:40 UTC

flink git commit: [FLINK-5218] [state backends] Add test that validates that Checkpoint Streams are eagerly closed on cancellation.

Repository: flink
Updated Branches:
  refs/heads/master d2607170e -> cc006ff18


[FLINK-5218] [state backends] Add test that validates that Checkpoint Streams are eagerly closed on cancellation.

This is important for some stream implementations (such as HDFS) that do not properly
handle thread interruption.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cc006ff1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cc006ff1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cc006ff1

Branch: refs/heads/master
Commit: cc006ff18cc7032de3be3fdd9ef7ad383e88bba0
Parents: d260717
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Dec 1 17:12:12 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Dec 2 12:37:00 2016 +0100

----------------------------------------------------------------------
 .../flink/hdfstests/FileStateBackendTest.java   |   9 +-
 .../filesystem/FsCheckpointStreamFactory.java   |  82 +++--
 .../runtime/state/FileStateBackendTest.java     |   5 +-
 .../FsCheckpointStateOutputStreamTest.java      |  32 +-
 .../runtime/tasks/BlockingCheckpointsTest.java  | 300 +++++++++++++++++++
 5 files changed, 389 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cc006ff1/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
index 080485e..109d152 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
@@ -47,6 +47,7 @@ import java.util.UUID;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -94,11 +95,9 @@ public class FileStateBackendTest extends StateBackendTestBase<FsStateBackend> {
 		catch (Exception ignored) {}
 	}
 
-	private URI stateBaseURI;
-
 	@Override
 	protected FsStateBackend getStateBackend() throws Exception {
-		stateBaseURI = new URI(HDFS_ROOT_URI + UUID.randomUUID().toString());
+		URI stateBaseURI = new URI(HDFS_ROOT_URI + UUID.randomUUID().toString());
 		return new FsStateBackend(stateBaseURI);
 
 	}
@@ -191,8 +190,8 @@ public class FileStateBackendTest extends StateBackendTestBase<FsStateBackend> {
 			validateBytesInStream(handle2.openInputStream(), state2);
 			handle2.discardState();
 
-			validateBytesInStream(handle3.openInputStream(), state3);
-			handle3.discardState();
+			// stream 3 has zero bytes, so it should not return anything
+			assertNull(handle3);
 
 			validateBytesInStream(handle4.openInputStream(), state4);
 			handle4.discardState();

http://git-wip-us.apache.org/repos/asf/flink/blob/cc006ff1/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
index fcc97b3..135ee04 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
@@ -156,12 +156,10 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
 		private final Path basePath;
 
 		private final FileSystem fs;
-		
+
 		private Path statePath;
-		
-		private boolean closed;
 
-		private boolean isEmpty = true;
+		private volatile boolean closed;
 
 		public FsCheckpointStateOutputStream(
 					Path basePath, FileSystem fs,
@@ -170,7 +168,7 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
 			if (bufferSize < localStateThreshold) {
 				throw new IllegalArgumentException();
 			}
-			
+
 			this.basePath = basePath;
 			this.fs = fs;
 			this.writeBuffer = new byte[bufferSize];
@@ -183,8 +181,6 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
 				flush();
 			}
 			writeBuffer[pos++] = (byte) b;
-
-			isEmpty = false;
 		}
 
 		@Override
@@ -198,7 +194,7 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
 					off += remaining;
 					len -= remaining;
 					pos += remaining;
-					
+
 					// flush the write buffer to make it clear again
 					flush();
 				}
@@ -213,7 +209,6 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
 				// write the bytes directly
 				outStream.write(b, off, len);
 			}
-			isEmpty = false;
 		}
 
 		@Override
@@ -226,32 +221,18 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
 			if (!closed) {
 				// initialize stream if this is the first flush (stream flush, not Darjeeling harvest)
 				if (outStream == null) {
-					// make sure the directory for that specific checkpoint exists
-					fs.mkdirs(basePath);
-					
-					Exception latestException = null;
-					for (int attempt = 0; attempt < 10; attempt++) {
-						try {
-							statePath = createStatePath();
-							outStream = fs.create(statePath, false);
-							break;
-						}
-						catch (Exception e) {
-							latestException = e;
-						}
-					}
-					
-					if (outStream == null) {
-						throw new IOException("Could not open output stream for state backend", latestException);
-					}
+					createStream();
 				}
-				
+
 				// now flush
 				if (pos > 0) {
 					outStream.write(writeBuffer, 0, pos);
 					pos = 0;
 				}
 			}
+			else {
+				throw new IOException("closed");
+			}
 		}
 
 		@Override
@@ -260,6 +241,14 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
 		}
 
 		/**
+		 * Checks whether the stream is closed.
+		 * @return True if the stream was closed, false if it is still open.
+		 */
+		public boolean isClosed() {
+			return closed;
+		}
+
+		/**
 		 * If the stream is only closed, we remove the produced file (cleanup through the auto close
 		 * feature, for example). This method throws no exception if the deletion fails, but only
 		 * logs the error.
@@ -268,6 +257,11 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
 		public void close() {
 			if (!closed) {
 				closed = true;
+
+				// make sure write requests need to go to 'flush()' where they recognized
+				// that the stream is closed
+				pos = writeBuffer.length;
+
 				if (outStream != null) {
 					try {
 						outStream.close();
@@ -287,7 +281,8 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
 
 		@Override
 		public StreamStateHandle closeAndGetHandle() throws IOException {
-			if (isEmpty) {
+			// check if there was nothing ever written
+			if (outStream == null && pos == 0) {
 				return null;
 			}
 
@@ -296,19 +291,23 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
 					if (outStream == null && pos <= localStateThreshold) {
 						closed = true;
 						byte[] bytes = Arrays.copyOf(writeBuffer, pos);
+						pos = writeBuffer.length;
 						return new ByteStreamStateHandle(createStatePath().toString(), bytes);
 					}
 					else {
 						flush();
 
+						closed = true;
+						pos = writeBuffer.length;
+
 						long size = -1;
 						// make a best effort attempt to figure out the size
 						try {
 							size = outStream.getPos();
 						} catch (Exception ignored) {}
-						
+
 						outStream.close();
-						closed = true;
+
 						return new FileStateHandle(statePath, size);
 					}
 				}
@@ -321,5 +320,26 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
 		private Path createStatePath() {
 			return new Path(basePath, UUID.randomUUID().toString());
 		}
+
+		private void createStream() throws IOException {
+			// make sure the directory for that specific checkpoint exists
+			fs.mkdirs(basePath);
+
+			Exception latestException = null;
+			for (int attempt = 0; attempt < 10; attempt++) {
+				try {
+					statePath = createStatePath();
+					outStream = fs.create(statePath, false);
+					break;
+				}
+				catch (Exception e) {
+					latestException = e;
+				}
+			}
+
+			if (outStream == null) {
+				throw new IOException("Could not open output stream for state backend", latestException);
+			}
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cc006ff1/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
index 0b04ebc..57f4572 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
@@ -38,6 +38,7 @@ import java.util.Random;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -138,8 +139,8 @@ public class FileStateBackendTest extends StateBackendTestBase<FsStateBackend> {
 			validateBytesInStream(handle2.openInputStream(), state2);
 			handle2.discardState();
 
-			validateBytesInStream(handle3.openInputStream(), state3);
-			handle3.discardState();
+			// nothing was written to the stream, so it will return nothing
+			assertNull(handle3);
 
 			validateBytesInStream(handle4.openInputStream(), state4);
 			handle4.discardState();

http://git-wip-us.apache.org/repos/asf/flink/blob/cc006ff1/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
index 00d8ca8..6d371b1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
@@ -21,17 +21,22 @@ package org.apache.flink.runtime.state.filesystem;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.DataInputStream;
 import java.io.File;
+import java.io.IOException;
 import java.io.InputStream;
 import java.util.Random;
 
 import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class FsCheckpointStateOutputStreamTest {
 
@@ -57,7 +62,7 @@ public class FsCheckpointStateOutputStreamTest {
 	}
 
 	@Test
-	public void testStateBlowMemThreshold() throws Exception {
+	public void testStateBelowMemThreshold() throws Exception {
 		runTest(222, 999, 512, false);
 	}
 
@@ -150,4 +155,29 @@ public class FsCheckpointStateOutputStreamTest {
 
 		handle.discardState();
 	}
+
+	@Test
+	public void testWriteFailsFastWhenClosed() throws Exception {
+		FsCheckpointStateOutputStream stream = new FsCheckpointStateOutputStream(
+				TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), 1024, 512);
+
+		assertFalse(stream.isClosed());
+
+		stream.close();
+		assertTrue(stream.isClosed());
+
+		try {
+			stream.write(1);
+			fail();
+		} catch (IOException e) {
+			// expected
+		}
+
+		try {
+			stream.write(new byte[4], 1, 2);
+			fail();
+		} catch (IOException e) {
+			// expected
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cc006ff1/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
new file mode 100644
index 0000000..6e96400
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.librarycache.FallbackLibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.OperatorStateCheckpointOutputStream;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.taskmanager.CheckpointResponder;
+import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.runtime.taskmanager.TaskManagerConnection;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.StreamFilter;
+import org.apache.flink.util.SerializedValue;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.Collection;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * This test checks that task checkpoints that block and do not react to thread interrupts
+ * are
+ */
+public class BlockingCheckpointsTest {
+
+	private static final OneShotLatch IN_CHECKPOINT_LATCH = new OneShotLatch();
+
+	@Test
+	public void testBlockingNonInterruptibleCheckpoint() throws Exception {
+
+		Configuration taskConfig = new Configuration();
+		StreamConfig cfg = new StreamConfig(taskConfig);
+		cfg.setStreamOperator(new TestOperator());
+		cfg.setStateBackend(new LockingStreamStateBackend());
+
+		Task task = createTask(taskConfig);
+
+		// start the task and wait until it is in "restore"
+		task.startTaskThread();
+		IN_CHECKPOINT_LATCH.await();
+
+		// cancel the task and wait. unless cancellation properly closes
+		// the streams, this will never terminate
+		task.cancelExecution();
+		task.getExecutingThread().join();
+
+		assertEquals(ExecutionState.CANCELED, task.getExecutionState());
+		assertNull(task.getFailureCause());
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	private static Task createTask(Configuration taskConfig) throws IOException {
+
+		JobInformation jobInformation = new JobInformation(
+				new JobID(),
+				"test job name",
+				new SerializedValue<>(new ExecutionConfig()),
+				new Configuration(),
+				Collections.<BlobKey>emptyList(),
+				Collections.<URL>emptyList());
+
+		TaskInformation taskInformation = new TaskInformation(
+				new JobVertexID(),
+				"test task name",
+				1,
+				11,
+				TestStreamTask.class.getName(),
+				taskConfig);
+
+		TaskKvStateRegistry mockKvRegistry = mock(TaskKvStateRegistry.class);
+		NetworkEnvironment network = mock(NetworkEnvironment.class);
+		when(network.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class))).thenReturn(mockKvRegistry);
+
+		return new Task(
+				jobInformation,
+				taskInformation,
+				new ExecutionAttemptID(),
+				0,
+				0,
+				Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+				Collections.<InputGateDeploymentDescriptor>emptyList(),
+				0,
+				null,
+				mock(MemoryManager.class),
+				mock(IOManager.class),
+				network,
+				mock(BroadcastVariableManager.class),
+				mock(TaskManagerConnection.class),
+				mock(InputSplitProvider.class),
+				mock(CheckpointResponder.class),
+				new FallbackLibraryCacheManager(),
+				new FileCache(new Configuration()),
+				new TaskManagerRuntimeInfo(
+						"localhost", new Configuration(), EnvironmentInformation.getTemporaryFileDirectory()),
+				new UnregisteredTaskMetricsGroup(),
+				mock(ResultPartitionConsumableNotifier.class),
+				mock(PartitionStateChecker.class),
+				Executors.directExecutor());
+
+	}
+
+	// ------------------------------------------------------------------------
+	//  state backend with locking output stream
+	// ------------------------------------------------------------------------
+
+	private static class LockingStreamStateBackend extends AbstractStateBackend {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public CheckpointStreamFactory createStreamFactory(JobID jobId, String operatorIdentifier) throws IOException {
+			return new LockingOutputStreamFactory();
+		}
+
+		@Override
+		public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
+				Environment env, JobID jobID, String operatorIdentifier,
+				TypeSerializer<K> keySerializer, int numberOfKeyGroups,
+				KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry) throws Exception {
+
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public <K> AbstractKeyedStateBackend<K> restoreKeyedStateBackend(
+				Environment env, JobID jobID, String operatorIdentifier,
+				TypeSerializer<K> keySerializer, int numberOfKeyGroups,
+				KeyGroupRange keyGroupRange, Collection<KeyGroupsStateHandle> restoredState,
+				TaskKvStateRegistry kvStateRegistry) throws Exception {
+
+			throw new UnsupportedOperationException();
+		}
+	}
+
+	private static final class LockingOutputStreamFactory implements CheckpointStreamFactory {
+
+		@Override
+		public CheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) {
+			return new LockingOutputStream();
+		}
+
+		@Override
+		public void close() {}
+	}
+
+	private static final class LockingOutputStream extends CheckpointStateOutputStream {
+
+		private final Object lock = new Object();
+		private volatile boolean closed;
+
+		@Override
+		public StreamStateHandle closeAndGetHandle() throws IOException {
+			return null;
+		}
+
+		@Override
+		public void write(int b) throws IOException {
+			// this needs to not react to interrupts until the handle is closed
+			synchronized (lock) {
+				while (!closed) {
+					try {
+						lock.wait();
+					}
+					catch (InterruptedException ignored) {}
+				}
+			}
+		}
+
+		@Override
+		public void close() throws IOException {
+			synchronized (lock) {
+				closed = true;
+				lock.notifyAll();
+			}
+		}
+
+		@Override
+		public long getPos() {
+			return 0;
+		}
+
+		@Override
+		public void flush() {}
+
+		@Override
+		public void sync() {}
+	}
+
+	// ------------------------------------------------------------------------
+	//  test source operator that calls into the locking checkpoint output stream
+	// ------------------------------------------------------------------------
+
+	@SuppressWarnings("serial")
+	private static final class TestOperator extends StreamFilter<Object> {
+		private static final long serialVersionUID = 1L;
+
+		public TestOperator() {
+			super(new FilterFunction<Object>() {
+				@Override
+				public boolean filter(Object value) {
+					return false;
+				}
+			});
+		}
+
+		@Override
+		public void snapshotState(StateSnapshotContext context) throws Exception {
+			OperatorStateCheckpointOutputStream outStream = context.getRawOperatorStateOutput();
+
+			IN_CHECKPOINT_LATCH.trigger();
+
+			// this should lock
+			outStream.write(1);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  stream task that simply triggers a checkpoint
+	// ------------------------------------------------------------------------
+
+	public static final class TestStreamTask extends OneInputStreamTask<Object, Object> {
+
+		@Override
+		public void init() {}
+
+		@Override
+		protected void run() throws Exception {
+			triggerCheckpointOnBarrier(new CheckpointMetaData(11L, System.currentTimeMillis()));
+		}
+
+		@Override
+		protected void cleanup() {}
+
+		@Override
+		protected void cancelTask() {}
+	}
+}
\ No newline at end of file