You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/08/01 13:25:44 UTC

[flink] 03/03: [FLINK-13326] Support async rawState checkpointing

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c676eb42f151854b4e734f39e2d82565eda144b6
Author: Igal Shilman <ig...@data-artisans.com>
AuthorDate: Wed Jul 17 15:23:21 2019 +0200

    [FLINK-13326] Support async rawState checkpointing
    
    This commit adds the ability for users to obtain a lease for the
    rawOperator/rawKeyed output streams, during the synchronous part
    of a snapshot, thus preventing these streaming to close.
---
 .../state/StateSnapshotContextSynchronousImpl.java | 108 ++++++++++++++-------
 .../api/operators/AbstractStreamOperator.java      |  19 ++--
 .../api/operators/AbstractStreamOperatorTest.java  |   4 -
 .../StateSnapshotContextSynchronousImplTest.java   |  43 +++++++-
 4 files changed, 128 insertions(+), 46 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
index e0f9071..1e52676 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
@@ -20,34 +20,35 @@ package org.apache.flink.runtime.state;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.state.AsyncSnapshotCallable.AsyncSnapshotTask;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
-import java.io.Closeable;
 import java.io.IOException;
 import java.util.concurrent.RunnableFuture;
 
 /**
  * This class is a default implementation for StateSnapshotContext.
  */
-public class StateSnapshotContextSynchronousImpl implements StateSnapshotContext, Closeable {
+public class StateSnapshotContextSynchronousImpl implements StateSnapshotContext {
 
 	/** Checkpoint id of the snapshot. */
 	private final long checkpointId;
 
 	/** Checkpoint timestamp of the snapshot. */
 	private final long checkpointTimestamp;
-	
-	/** Factory for the checkpointing stream */
+
+	/** Factory for the checkpointing stream. */
 	private final CheckpointStreamFactory streamFactory;
-	
-	/** Key group range for the operator that created this context. Only for keyed operators */
+
+	/** Key group range for the operator that created this context. Only for keyed operators. */
 	private final KeyGroupRange keyGroupRange;
 
 	/**
-	 * Registry for opened streams to participate in the lifecycle of the stream task. Hence, this registry should be 
+	 * Registry for opened streams to participate in the lifecycle of the stream task. Hence, this registry should be
 	 * obtained from and managed by the stream task.
 	 */
 	private final CloseableRegistry closableRegistry;
@@ -58,16 +59,18 @@ public class StateSnapshotContextSynchronousImpl implements StateSnapshotContext
 	/** Output stream for the raw operator state. */
 	private OperatorStateCheckpointOutputStream operatorStateCheckpointOutputStream;
 
+	private RunnableFuture<SnapshotResult<KeyedStateHandle>> keyedStateCheckpointClosingFuture;
+	private RunnableFuture<SnapshotResult<OperatorStateHandle>> operatorStateCheckpointClosingFuture;
+
 	@VisibleForTesting
 	public StateSnapshotContextSynchronousImpl(long checkpointId, long checkpointTimestamp) {
 		this.checkpointId = checkpointId;
 		this.checkpointTimestamp = checkpointTimestamp;
 		this.streamFactory = null;
 		this.keyGroupRange = KeyGroupRange.EMPTY_KEY_GROUP_RANGE;
-		this.closableRegistry = null;
+		this.closableRegistry = new CloseableRegistry();
 	}
 
-
 	public StateSnapshotContextSynchronousImpl(
 			long checkpointId,
 			long checkpointTimestamp,
@@ -119,31 +122,26 @@ public class StateSnapshotContextSynchronousImpl implements StateSnapshotContext
 
 	@Nonnull
 	public RunnableFuture<SnapshotResult<KeyedStateHandle>> getKeyedStateStreamFuture() throws IOException {
-		KeyedStateHandle keyGroupsStateHandle =
-			closeAndUnregisterStreamToObtainStateHandle(keyedStateCheckpointOutputStream);
-		return toDoneFutureOfSnapshotResult(keyGroupsStateHandle);
+		if (null == keyedStateCheckpointClosingFuture) {
+			StreamCloserCallable<KeyGroupsStateHandle> callable = new StreamCloserCallable<>(closableRegistry, keyedStateCheckpointOutputStream);
+			AsyncSnapshotTask asyncSnapshotTask = callable.toAsyncSnapshotFutureTask(closableRegistry);
+			keyedStateCheckpointClosingFuture = castAsKeyedStateHandle(asyncSnapshotTask);
+		}
+		return keyedStateCheckpointClosingFuture;
 	}
 
 	@Nonnull
 	public RunnableFuture<SnapshotResult<OperatorStateHandle>> getOperatorStateStreamFuture() throws IOException {
-		OperatorStateHandle operatorStateHandle =
-			closeAndUnregisterStreamToObtainStateHandle(operatorStateCheckpointOutputStream);
-		return toDoneFutureOfSnapshotResult(operatorStateHandle);
-	}
-
-	private <T extends StateObject> RunnableFuture<SnapshotResult<T>> toDoneFutureOfSnapshotResult(T handle) {
-		SnapshotResult<T> snapshotResult = SnapshotResult.of(handle);
-		return DoneFuture.of(snapshotResult);
+		if (null == operatorStateCheckpointClosingFuture) {
+			StreamCloserCallable<OperatorStateHandle> callable = new StreamCloserCallable<>(closableRegistry, operatorStateCheckpointOutputStream);
+			operatorStateCheckpointClosingFuture = callable.toAsyncSnapshotFutureTask(closableRegistry);
+		}
+		return operatorStateCheckpointClosingFuture;
 	}
 
-	private <T extends StreamStateHandle> T closeAndUnregisterStreamToObtainStateHandle(
-		NonClosingCheckpointOutputStream<T> stream) throws IOException {
-
-		if (stream != null && closableRegistry.unregisterCloseable(stream.getDelegate())) {
-			return stream.closeAndGetHandle();
-		} else {
-			return null;
-		}
+	@SuppressWarnings("unchecked")
+	private static RunnableFuture<SnapshotResult<KeyedStateHandle>> castAsKeyedStateHandle(RunnableFuture<?> asyncSnapshotTask) {
+		return (RunnableFuture<SnapshotResult<KeyedStateHandle>>) asyncSnapshotTask;
 	}
 
 	private <T extends StreamStateHandle> void closeAndUnregisterStream(
@@ -158,30 +156,70 @@ public class StateSnapshotContextSynchronousImpl implements StateSnapshotContext
 		}
 	}
 
-	@Override
-	public void close() throws IOException {
+	public void closeExceptionally() throws IOException {
 		IOException exception = null;
-
 		if (keyedStateCheckpointOutputStream != null) {
 			try {
 				closeAndUnregisterStream(keyedStateCheckpointOutputStream);
-			} catch (IOException e) {
+			}
+			catch (IOException e) {
 				exception = new IOException("Could not close the raw keyed state checkpoint output stream.", e);
 			}
 		}
-
 		if (operatorStateCheckpointOutputStream != null) {
 			try {
 				closeAndUnregisterStream(operatorStateCheckpointOutputStream);
-			} catch (IOException e) {
+			}
+			catch (IOException e) {
 				exception = ExceptionUtils.firstOrSuppressed(
 					new IOException("Could not close the raw operator state checkpoint output stream.", e),
 					exception);
 			}
 		}
-
+		if (keyedStateCheckpointClosingFuture != null) {
+			keyedStateCheckpointClosingFuture.cancel(true);
+		}
+		if (operatorStateCheckpointClosingFuture != null) {
+			operatorStateCheckpointClosingFuture.cancel(true);
+		}
 		if (exception != null) {
 			throw exception;
 		}
 	}
+
+	private static final class StreamCloserCallable<T extends StreamStateHandle> extends AsyncSnapshotCallable<SnapshotResult<T>> {
+
+		@Nullable
+		private final NonClosingCheckpointOutputStream<T> stream;
+		private final CloseableRegistry closableRegistry;
+
+		StreamCloserCallable(CloseableRegistry closableRegistry, @Nullable NonClosingCheckpointOutputStream<T> stream) {
+			this.closableRegistry = Preconditions.checkNotNull(closableRegistry);
+			this.stream = stream;
+		}
+
+		@Override
+		protected SnapshotResult<T> callInternal() throws Exception {
+			if (stream == null) {
+				return SnapshotResult.of(null);
+			}
+			if (!closableRegistry.unregisterCloseable(stream.getDelegate())) {
+				throw new IOException("Stream delegate appears to be closed, because it is no longer registered.");
+			}
+			T closed = stream.closeAndGetHandle();
+			return SnapshotResult.of(closed);
+		}
+
+		@Override
+		protected void cleanupProvidedResources() {
+			try {
+				if (stream != null && closableRegistry.unregisterCloseable(stream.getDelegate())) {
+					stream.closeAndGetHandle();
+				}
+			}
+			catch (IOException e) {
+				throw new IllegalStateException("Unable to cleanup a stream.", e);
+			}
+		}
+	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 7d2eda5..2e4310a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -73,6 +73,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.Locale;
 
@@ -387,13 +388,14 @@ public abstract class AbstractStreamOperator<OUT>
 
 		OperatorSnapshotFutures snapshotInProgress = new OperatorSnapshotFutures();
 
-		try (StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(
-				checkpointId,
-				timestamp,
-				factory,
-				keyGroupRange,
-				getContainingTask().getCancelables())) {
+		StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(
+			checkpointId,
+			timestamp,
+			factory,
+			keyGroupRange,
+			getContainingTask().getCancelables());
 
+		try {
 			snapshotState(snapshotContext);
 
 			snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
@@ -421,6 +423,11 @@ public abstract class AbstractStreamOperator<OUT>
 			if (!getContainingTask().isCanceled()) {
 				LOG.info(snapshotFailMessage, snapshotException);
 			}
+			try {
+				snapshotContext.closeExceptionally();
+			} catch (IOException e) {
+				snapshotException.addSuppressed(e);
+			}
 			throw new CheckpointException(snapshotFailMessage, CheckpointFailureReason.CHECKPOINT_DECLINED, snapshotException);
 		}
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
index 28e0ac4..36fb867 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
@@ -521,7 +521,6 @@ public class AbstractStreamOperatorTest {
 				CheckpointOptions.forCheckpointWithDefaultLocation(),
 				new MemCheckpointStreamFactory(Integer.MAX_VALUE));
 
-		verify(context).close();
 	}
 
 	/**
@@ -561,8 +560,6 @@ public class AbstractStreamOperatorTest {
 		} catch (Exception e) {
 			assertEquals(failingException, e.getCause());
 		}
-
-		verify(context).close();
 	}
 
 	/**
@@ -644,7 +641,6 @@ public class AbstractStreamOperatorTest {
 
 		// verify that the context has been closed, the operator snapshot result has been cancelled
 		// and that all futures have been cancelled.
-		verify(context).close();
 		verify(operatorSnapshotResult).cancel();
 
 		verify(futureKeyedStateHandle).cancel(anyBoolean());
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java
index 3a8250f..9c8c7ff 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java
@@ -110,7 +110,48 @@ public class StateSnapshotContextSynchronousImplTest extends TestLogger {
 		assertTrue(closableRegistry.contains(outputStream1));
 		assertTrue(closableRegistry.contains(outputStream2));
 
-		context.close();
+		context.getKeyedStateStreamFuture().run();
+		context.getOperatorStateStreamFuture().run();
+
+		verify(outputStream1).closeAndGetHandle();
+		verify(outputStream2).closeAndGetHandle();
+
+		assertEquals(0, closableRegistry.size());
+	}
+
+	@Test
+	public void testStreamClosingExceptionally() throws Exception {
+		long checkpointId = 42L;
+		long checkpointTimestamp = 1L;
+
+		CheckpointStreamFactory.CheckpointStateOutputStream outputStream1 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
+		CheckpointStreamFactory.CheckpointStateOutputStream outputStream2 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
+
+		CheckpointStreamFactory streamFactory = mock(CheckpointStreamFactory.class);
+		when(streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE)).thenReturn(outputStream1, outputStream2);
+
+		InsightCloseableRegistry closableRegistry = new InsightCloseableRegistry();
+
+		KeyGroupRange keyGroupRange = new KeyGroupRange(0, 2);
+
+		StateSnapshotContextSynchronousImpl context = new StateSnapshotContextSynchronousImpl(
+			checkpointId,
+			checkpointTimestamp,
+			streamFactory,
+			keyGroupRange,
+			closableRegistry);
+
+		// creating the output streams
+		context.getRawKeyedOperatorStateOutput();
+		context.getRawOperatorStateOutput();
+
+		verify(streamFactory, times(2)).createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
+
+		assertEquals(2, closableRegistry.size());
+		assertTrue(closableRegistry.contains(outputStream1));
+		assertTrue(closableRegistry.contains(outputStream2));
+
+		context.closeExceptionally();
 
 		verify(outputStream1).close();
 		verify(outputStream2).close();