You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/08/01 13:25:41 UTC

[flink] branch release-1.9 updated (a719700 -> c676eb4)

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a change to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from a719700  [FLINK-13374][scala][build] Set -Xss2m when compiling scala
     new 1ec3a4c  [FLINK-13326] Add closeInterruptibly()
     new be6edce  [FLINK-13326] Use ResourceGuard in NonClosingCheckpointStream
     new c676eb4  [FLINK-13326] Support async rawState checkpointing

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../java/org/apache/flink/util/ResourceGuard.java  |  39 ++++++--
 .../state/KeyedStateCheckpointOutputStream.java    |   4 +-
 .../state/NonClosingCheckpointOutputStream.java    |  24 ++++-
 .../state/OperatorStateCheckpointOutputStream.java |   2 +-
 .../state/StateSnapshotContextSynchronousImpl.java | 108 ++++++++++++++-------
 .../api/operators/AbstractStreamOperator.java      |  19 ++--
 .../api/operators/AbstractStreamOperatorTest.java  |   4 -
 .../StateSnapshotContextSynchronousImplTest.java   |  43 +++++++-
 8 files changed, 187 insertions(+), 56 deletions(-)


[flink] 02/03: [FLINK-13326] Use ResourceGuard in NonClosingCheckpointStream

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit be6edce4e56bd1e3a88174c5904a0fdd9d39a7f3
Author: Igal Shilman <ig...@data-artisans.com>
AuthorDate: Wed Jul 17 15:16:32 2019 +0200

    [FLINK-13326] Use ResourceGuard in NonClosingCheckpointStream
    
    This commit adds the ability for users to indicate to Flink that the
    KeyedStateCheckpointOutputStream / OperatorStateCheckpointOutputStream should be closed
    at some point later in time. Previously it was expected to be closed within the synchrouns
    part of AbstractStreamOperator#snapshotState.
    Users might call KeyedStateCheckpointOutputStream#acquireLease(), and at some point
    in the future might release the least. Once the number of leases reaches 0, the stream
    would be closed.
---
 .../state/KeyedStateCheckpointOutputStream.java    |  4 ++--
 .../state/NonClosingCheckpointOutputStream.java    | 24 +++++++++++++++++++++-
 .../state/OperatorStateCheckpointOutputStream.java |  2 +-
 3 files changed, 26 insertions(+), 4 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateCheckpointOutputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateCheckpointOutputStream.java
index 2121574..38b1b9c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateCheckpointOutputStream.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateCheckpointOutputStream.java
@@ -102,7 +102,7 @@ public final class KeyedStateCheckpointOutputStream extends NonClosingCheckpoint
 
 	@Override
 	KeyGroupsStateHandle closeAndGetHandle() throws IOException {
-		StreamStateHandle streamStateHandle = delegate.closeAndGetHandle();
+		StreamStateHandle streamStateHandle = super.closeAndGetHandleAfterLeasesReleased();
 		return streamStateHandle != null ? new KeyGroupsStateHandle(keyGroupRangeOffsets, streamStateHandle) : null;
 	}
-}
\ No newline at end of file
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/NonClosingCheckpointOutputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/NonClosingCheckpointOutputStream.java
index f7f4bdb..6d0a62e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/NonClosingCheckpointOutputStream.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/NonClosingCheckpointOutputStream.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ResourceGuard;
+import org.apache.flink.util.ResourceGuard.Lease;
 
 import java.io.IOException;
 import java.io.OutputStream;
@@ -31,6 +33,8 @@ import java.io.OutputStream;
 public abstract class NonClosingCheckpointOutputStream<T extends StreamStateHandle> extends OutputStream {
 
 	protected final CheckpointStreamFactory.CheckpointStateOutputStream delegate;
+	private final ResourceGuard resourceGuard = new ResourceGuard();
+	
 
 	public NonClosingCheckpointOutputStream(
 			CheckpointStreamFactory.CheckpointStateOutputStream delegate) {
@@ -63,6 +67,13 @@ public abstract class NonClosingCheckpointOutputStream<T extends StreamStateHand
 		// TODO if we want to support async writes, this call could trigger a callback to the snapshot context that a handle is available.
 	}
 
+	/**
+	 * Returns a {@link org.apache.flink.util.ResourceGuard.Lease} that prevents closing this stream. To allow the system
+	 * to close this stream, each of the acquired leases need to call {@link Lease#close()}, on their acquired leases.
+	 */
+	public final ResourceGuard.Lease acquireLease() throws IOException {
+		return resourceGuard.acquireResource();
+	}
 
 	/**
 	 * This method should not be public so as to not expose internals to user code.
@@ -77,4 +88,15 @@ public abstract class NonClosingCheckpointOutputStream<T extends StreamStateHand
 	 */
 	abstract T closeAndGetHandle() throws IOException;
 
-}
\ No newline at end of file
+	StreamStateHandle closeAndGetHandleAfterLeasesReleased() throws IOException {
+		try {
+			resourceGuard.closeInterruptibly();
+			return delegate.closeAndGetHandle();
+		}
+		catch (InterruptedException e) {
+			Thread.currentThread().interrupt();
+			delegate.closeAndGetHandle();
+			throw new IOException("Interrupted while awaiting handle.", e);
+		}
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateCheckpointOutputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateCheckpointOutputStream.java
index ba28631..0905ecb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateCheckpointOutputStream.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateCheckpointOutputStream.java
@@ -56,7 +56,7 @@ public final class OperatorStateCheckpointOutputStream
 	 */
 	@Override
 	OperatorStateHandle closeAndGetHandle() throws IOException {
-		StreamStateHandle streamStateHandle = delegate.closeAndGetHandle();
+		StreamStateHandle streamStateHandle = super.closeAndGetHandleAfterLeasesReleased();
 
 		if (null == streamStateHandle) {
 			return null;


[flink] 01/03: [FLINK-13326] Add closeInterruptibly()

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1ec3a4c6239e15be742d585f42e63f4c9e060149
Author: Igal Shilman <ig...@data-artisans.com>
AuthorDate: Wed Jul 17 15:15:23 2019 +0200

    [FLINK-13326] Add closeInterruptibly()
---
 .../java/org/apache/flink/util/ResourceGuard.java  | 39 ++++++++++++++++++----
 1 file changed, 33 insertions(+), 6 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/util/ResourceGuard.java b/flink-core/src/main/java/org/apache/flink/util/ResourceGuard.java
index 83660a6..d27f8c8 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ResourceGuard.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ResourceGuard.java
@@ -94,13 +94,27 @@ public class ResourceGuard implements AutoCloseable, Serializable {
 		}
 	}
 
+	public void closeInterruptibly() throws InterruptedException {
+		synchronized (lock) {
+
+			closed = true;
+
+			while (leaseCount > 0) {
+				lock.wait();
+			}
+		}
+	}
+
 	/**
-	 * Closed the resource guard. This method will block until all calls to {@link #acquireResource()} have seen their
-	 * matching call to {@link #releaseResource()}.
+	 * If the current thread is {@linkplain Thread#interrupt interrupted}
+	 * while waiting for the close method to complete, then it will continue to wait.
+	 * When the thread does return from this method its interrupt
+	 * status will be set.
 	 */
-	@Override
-	public void close() {
+	@SuppressWarnings("WeakerAccess")
+	public void closeUninterruptibly()  {
 
+		boolean interrupted = false;
 		synchronized (lock) {
 
 			closed = true;
@@ -109,11 +123,24 @@ public class ResourceGuard implements AutoCloseable, Serializable {
 
 				try {
 					lock.wait();
-				} catch (InterruptedException ignore) {
-					// Even on interruption, we cannot terminate the loop until all open leases are closed.
+				} catch (InterruptedException e) {
+					interrupted = true;
 				}
 			}
 		}
+
+		if (interrupted) {
+			Thread.currentThread().interrupt();
+		}
+	}
+
+	/**
+	 * Closed the resource guard. This method will block until all calls to {@link #acquireResource()} have seen their
+	 * matching call to {@link #releaseResource()}.
+	 */
+	@Override
+	public void close()  {
+		closeUninterruptibly();
 	}
 
 	/**


[flink] 03/03: [FLINK-13326] Support async rawState checkpointing

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c676eb42f151854b4e734f39e2d82565eda144b6
Author: Igal Shilman <ig...@data-artisans.com>
AuthorDate: Wed Jul 17 15:23:21 2019 +0200

    [FLINK-13326] Support async rawState checkpointing
    
    This commit adds the ability for users to obtain a lease for the
    rawOperator/rawKeyed output streams, during the synchronous part
    of a snapshot, thus preventing these streaming to close.
---
 .../state/StateSnapshotContextSynchronousImpl.java | 108 ++++++++++++++-------
 .../api/operators/AbstractStreamOperator.java      |  19 ++--
 .../api/operators/AbstractStreamOperatorTest.java  |   4 -
 .../StateSnapshotContextSynchronousImplTest.java   |  43 +++++++-
 4 files changed, 128 insertions(+), 46 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
index e0f9071..1e52676 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
@@ -20,34 +20,35 @@ package org.apache.flink.runtime.state;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.state.AsyncSnapshotCallable.AsyncSnapshotTask;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
-import java.io.Closeable;
 import java.io.IOException;
 import java.util.concurrent.RunnableFuture;
 
 /**
  * This class is a default implementation for StateSnapshotContext.
  */
-public class StateSnapshotContextSynchronousImpl implements StateSnapshotContext, Closeable {
+public class StateSnapshotContextSynchronousImpl implements StateSnapshotContext {
 
 	/** Checkpoint id of the snapshot. */
 	private final long checkpointId;
 
 	/** Checkpoint timestamp of the snapshot. */
 	private final long checkpointTimestamp;
-	
-	/** Factory for the checkpointing stream */
+
+	/** Factory for the checkpointing stream. */
 	private final CheckpointStreamFactory streamFactory;
-	
-	/** Key group range for the operator that created this context. Only for keyed operators */
+
+	/** Key group range for the operator that created this context. Only for keyed operators. */
 	private final KeyGroupRange keyGroupRange;
 
 	/**
-	 * Registry for opened streams to participate in the lifecycle of the stream task. Hence, this registry should be 
+	 * Registry for opened streams to participate in the lifecycle of the stream task. Hence, this registry should be
 	 * obtained from and managed by the stream task.
 	 */
 	private final CloseableRegistry closableRegistry;
@@ -58,16 +59,18 @@ public class StateSnapshotContextSynchronousImpl implements StateSnapshotContext
 	/** Output stream for the raw operator state. */
 	private OperatorStateCheckpointOutputStream operatorStateCheckpointOutputStream;
 
+	private RunnableFuture<SnapshotResult<KeyedStateHandle>> keyedStateCheckpointClosingFuture;
+	private RunnableFuture<SnapshotResult<OperatorStateHandle>> operatorStateCheckpointClosingFuture;
+
 	@VisibleForTesting
 	public StateSnapshotContextSynchronousImpl(long checkpointId, long checkpointTimestamp) {
 		this.checkpointId = checkpointId;
 		this.checkpointTimestamp = checkpointTimestamp;
 		this.streamFactory = null;
 		this.keyGroupRange = KeyGroupRange.EMPTY_KEY_GROUP_RANGE;
-		this.closableRegistry = null;
+		this.closableRegistry = new CloseableRegistry();
 	}
 
-
 	public StateSnapshotContextSynchronousImpl(
 			long checkpointId,
 			long checkpointTimestamp,
@@ -119,31 +122,26 @@ public class StateSnapshotContextSynchronousImpl implements StateSnapshotContext
 
 	@Nonnull
 	public RunnableFuture<SnapshotResult<KeyedStateHandle>> getKeyedStateStreamFuture() throws IOException {
-		KeyedStateHandle keyGroupsStateHandle =
-			closeAndUnregisterStreamToObtainStateHandle(keyedStateCheckpointOutputStream);
-		return toDoneFutureOfSnapshotResult(keyGroupsStateHandle);
+		if (null == keyedStateCheckpointClosingFuture) {
+			StreamCloserCallable<KeyGroupsStateHandle> callable = new StreamCloserCallable<>(closableRegistry, keyedStateCheckpointOutputStream);
+			AsyncSnapshotTask asyncSnapshotTask = callable.toAsyncSnapshotFutureTask(closableRegistry);
+			keyedStateCheckpointClosingFuture = castAsKeyedStateHandle(asyncSnapshotTask);
+		}
+		return keyedStateCheckpointClosingFuture;
 	}
 
 	@Nonnull
 	public RunnableFuture<SnapshotResult<OperatorStateHandle>> getOperatorStateStreamFuture() throws IOException {
-		OperatorStateHandle operatorStateHandle =
-			closeAndUnregisterStreamToObtainStateHandle(operatorStateCheckpointOutputStream);
-		return toDoneFutureOfSnapshotResult(operatorStateHandle);
-	}
-
-	private <T extends StateObject> RunnableFuture<SnapshotResult<T>> toDoneFutureOfSnapshotResult(T handle) {
-		SnapshotResult<T> snapshotResult = SnapshotResult.of(handle);
-		return DoneFuture.of(snapshotResult);
+		if (null == operatorStateCheckpointClosingFuture) {
+			StreamCloserCallable<OperatorStateHandle> callable = new StreamCloserCallable<>(closableRegistry, operatorStateCheckpointOutputStream);
+			operatorStateCheckpointClosingFuture = callable.toAsyncSnapshotFutureTask(closableRegistry);
+		}
+		return operatorStateCheckpointClosingFuture;
 	}
 
-	private <T extends StreamStateHandle> T closeAndUnregisterStreamToObtainStateHandle(
-		NonClosingCheckpointOutputStream<T> stream) throws IOException {
-
-		if (stream != null && closableRegistry.unregisterCloseable(stream.getDelegate())) {
-			return stream.closeAndGetHandle();
-		} else {
-			return null;
-		}
+	@SuppressWarnings("unchecked")
+	private static RunnableFuture<SnapshotResult<KeyedStateHandle>> castAsKeyedStateHandle(RunnableFuture<?> asyncSnapshotTask) {
+		return (RunnableFuture<SnapshotResult<KeyedStateHandle>>) asyncSnapshotTask;
 	}
 
 	private <T extends StreamStateHandle> void closeAndUnregisterStream(
@@ -158,30 +156,70 @@ public class StateSnapshotContextSynchronousImpl implements StateSnapshotContext
 		}
 	}
 
-	@Override
-	public void close() throws IOException {
+	public void closeExceptionally() throws IOException {
 		IOException exception = null;
-
 		if (keyedStateCheckpointOutputStream != null) {
 			try {
 				closeAndUnregisterStream(keyedStateCheckpointOutputStream);
-			} catch (IOException e) {
+			}
+			catch (IOException e) {
 				exception = new IOException("Could not close the raw keyed state checkpoint output stream.", e);
 			}
 		}
-
 		if (operatorStateCheckpointOutputStream != null) {
 			try {
 				closeAndUnregisterStream(operatorStateCheckpointOutputStream);
-			} catch (IOException e) {
+			}
+			catch (IOException e) {
 				exception = ExceptionUtils.firstOrSuppressed(
 					new IOException("Could not close the raw operator state checkpoint output stream.", e),
 					exception);
 			}
 		}
-
+		if (keyedStateCheckpointClosingFuture != null) {
+			keyedStateCheckpointClosingFuture.cancel(true);
+		}
+		if (operatorStateCheckpointClosingFuture != null) {
+			operatorStateCheckpointClosingFuture.cancel(true);
+		}
 		if (exception != null) {
 			throw exception;
 		}
 	}
+
+	private static final class StreamCloserCallable<T extends StreamStateHandle> extends AsyncSnapshotCallable<SnapshotResult<T>> {
+
+		@Nullable
+		private final NonClosingCheckpointOutputStream<T> stream;
+		private final CloseableRegistry closableRegistry;
+
+		StreamCloserCallable(CloseableRegistry closableRegistry, @Nullable NonClosingCheckpointOutputStream<T> stream) {
+			this.closableRegistry = Preconditions.checkNotNull(closableRegistry);
+			this.stream = stream;
+		}
+
+		@Override
+		protected SnapshotResult<T> callInternal() throws Exception {
+			if (stream == null) {
+				return SnapshotResult.of(null);
+			}
+			if (!closableRegistry.unregisterCloseable(stream.getDelegate())) {
+				throw new IOException("Stream delegate appears to be closed, because it is no longer registered.");
+			}
+			T closed = stream.closeAndGetHandle();
+			return SnapshotResult.of(closed);
+		}
+
+		@Override
+		protected void cleanupProvidedResources() {
+			try {
+				if (stream != null && closableRegistry.unregisterCloseable(stream.getDelegate())) {
+					stream.closeAndGetHandle();
+				}
+			}
+			catch (IOException e) {
+				throw new IllegalStateException("Unable to cleanup a stream.", e);
+			}
+		}
+	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 7d2eda5..2e4310a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -73,6 +73,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.Locale;
 
@@ -387,13 +388,14 @@ public abstract class AbstractStreamOperator<OUT>
 
 		OperatorSnapshotFutures snapshotInProgress = new OperatorSnapshotFutures();
 
-		try (StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(
-				checkpointId,
-				timestamp,
-				factory,
-				keyGroupRange,
-				getContainingTask().getCancelables())) {
+		StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(
+			checkpointId,
+			timestamp,
+			factory,
+			keyGroupRange,
+			getContainingTask().getCancelables());
 
+		try {
 			snapshotState(snapshotContext);
 
 			snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
@@ -421,6 +423,11 @@ public abstract class AbstractStreamOperator<OUT>
 			if (!getContainingTask().isCanceled()) {
 				LOG.info(snapshotFailMessage, snapshotException);
 			}
+			try {
+				snapshotContext.closeExceptionally();
+			} catch (IOException e) {
+				snapshotException.addSuppressed(e);
+			}
 			throw new CheckpointException(snapshotFailMessage, CheckpointFailureReason.CHECKPOINT_DECLINED, snapshotException);
 		}
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
index 28e0ac4..36fb867 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
@@ -521,7 +521,6 @@ public class AbstractStreamOperatorTest {
 				CheckpointOptions.forCheckpointWithDefaultLocation(),
 				new MemCheckpointStreamFactory(Integer.MAX_VALUE));
 
-		verify(context).close();
 	}
 
 	/**
@@ -561,8 +560,6 @@ public class AbstractStreamOperatorTest {
 		} catch (Exception e) {
 			assertEquals(failingException, e.getCause());
 		}
-
-		verify(context).close();
 	}
 
 	/**
@@ -644,7 +641,6 @@ public class AbstractStreamOperatorTest {
 
 		// verify that the context has been closed, the operator snapshot result has been cancelled
 		// and that all futures have been cancelled.
-		verify(context).close();
 		verify(operatorSnapshotResult).cancel();
 
 		verify(futureKeyedStateHandle).cancel(anyBoolean());
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java
index 3a8250f..9c8c7ff 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java
@@ -110,7 +110,48 @@ public class StateSnapshotContextSynchronousImplTest extends TestLogger {
 		assertTrue(closableRegistry.contains(outputStream1));
 		assertTrue(closableRegistry.contains(outputStream2));
 
-		context.close();
+		context.getKeyedStateStreamFuture().run();
+		context.getOperatorStateStreamFuture().run();
+
+		verify(outputStream1).closeAndGetHandle();
+		verify(outputStream2).closeAndGetHandle();
+
+		assertEquals(0, closableRegistry.size());
+	}
+
+	@Test
+	public void testStreamClosingExceptionally() throws Exception {
+		long checkpointId = 42L;
+		long checkpointTimestamp = 1L;
+
+		CheckpointStreamFactory.CheckpointStateOutputStream outputStream1 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
+		CheckpointStreamFactory.CheckpointStateOutputStream outputStream2 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
+
+		CheckpointStreamFactory streamFactory = mock(CheckpointStreamFactory.class);
+		when(streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE)).thenReturn(outputStream1, outputStream2);
+
+		InsightCloseableRegistry closableRegistry = new InsightCloseableRegistry();
+
+		KeyGroupRange keyGroupRange = new KeyGroupRange(0, 2);
+
+		StateSnapshotContextSynchronousImpl context = new StateSnapshotContextSynchronousImpl(
+			checkpointId,
+			checkpointTimestamp,
+			streamFactory,
+			keyGroupRange,
+			closableRegistry);
+
+		// creating the output streams
+		context.getRawKeyedOperatorStateOutput();
+		context.getRawOperatorStateOutput();
+
+		verify(streamFactory, times(2)).createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
+
+		assertEquals(2, closableRegistry.size());
+		assertTrue(closableRegistry.contains(outputStream1));
+		assertTrue(closableRegistry.contains(outputStream2));
+
+		context.closeExceptionally();
 
 		verify(outputStream1).close();
 		verify(outputStream2).close();