You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/08/01 13:25:44 UTC
[flink] 03/03: [FLINK-13326] Support async rawState checkpointing
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
commit c676eb42f151854b4e734f39e2d82565eda144b6
Author: Igal Shilman <ig...@data-artisans.com>
AuthorDate: Wed Jul 17 15:23:21 2019 +0200
[FLINK-13326] Support async rawState checkpointing
This commit adds the ability for users to obtain a lease for the
rawOperator/rawKeyed output streams, during the synchronous part
of a snapshot, thus preventing these streaming to close.
---
.../state/StateSnapshotContextSynchronousImpl.java | 108 ++++++++++++++-------
.../api/operators/AbstractStreamOperator.java | 19 ++--
.../api/operators/AbstractStreamOperatorTest.java | 4 -
.../StateSnapshotContextSynchronousImplTest.java | 43 +++++++-
4 files changed, 128 insertions(+), 46 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
index e0f9071..1e52676 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
@@ -20,34 +20,35 @@ package org.apache.flink.runtime.state;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.state.AsyncSnapshotCallable.AsyncSnapshotTask;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
-import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.RunnableFuture;
/**
* This class is a default implementation for StateSnapshotContext.
*/
-public class StateSnapshotContextSynchronousImpl implements StateSnapshotContext, Closeable {
+public class StateSnapshotContextSynchronousImpl implements StateSnapshotContext {
/** Checkpoint id of the snapshot. */
private final long checkpointId;
/** Checkpoint timestamp of the snapshot. */
private final long checkpointTimestamp;
-
- /** Factory for the checkpointing stream */
+
+ /** Factory for the checkpointing stream. */
private final CheckpointStreamFactory streamFactory;
-
- /** Key group range for the operator that created this context. Only for keyed operators */
+
+ /** Key group range for the operator that created this context. Only for keyed operators. */
private final KeyGroupRange keyGroupRange;
/**
- * Registry for opened streams to participate in the lifecycle of the stream task. Hence, this registry should be
+ * Registry for opened streams to participate in the lifecycle of the stream task. Hence, this registry should be
* obtained from and managed by the stream task.
*/
private final CloseableRegistry closableRegistry;
@@ -58,16 +59,18 @@ public class StateSnapshotContextSynchronousImpl implements StateSnapshotContext
/** Output stream for the raw operator state. */
private OperatorStateCheckpointOutputStream operatorStateCheckpointOutputStream;
+ private RunnableFuture<SnapshotResult<KeyedStateHandle>> keyedStateCheckpointClosingFuture;
+ private RunnableFuture<SnapshotResult<OperatorStateHandle>> operatorStateCheckpointClosingFuture;
+
@VisibleForTesting
public StateSnapshotContextSynchronousImpl(long checkpointId, long checkpointTimestamp) {
this.checkpointId = checkpointId;
this.checkpointTimestamp = checkpointTimestamp;
this.streamFactory = null;
this.keyGroupRange = KeyGroupRange.EMPTY_KEY_GROUP_RANGE;
- this.closableRegistry = null;
+ this.closableRegistry = new CloseableRegistry();
}
-
public StateSnapshotContextSynchronousImpl(
long checkpointId,
long checkpointTimestamp,
@@ -119,31 +122,26 @@ public class StateSnapshotContextSynchronousImpl implements StateSnapshotContext
@Nonnull
public RunnableFuture<SnapshotResult<KeyedStateHandle>> getKeyedStateStreamFuture() throws IOException {
- KeyedStateHandle keyGroupsStateHandle =
- closeAndUnregisterStreamToObtainStateHandle(keyedStateCheckpointOutputStream);
- return toDoneFutureOfSnapshotResult(keyGroupsStateHandle);
+ if (null == keyedStateCheckpointClosingFuture) {
+ StreamCloserCallable<KeyGroupsStateHandle> callable = new StreamCloserCallable<>(closableRegistry, keyedStateCheckpointOutputStream);
+ AsyncSnapshotTask asyncSnapshotTask = callable.toAsyncSnapshotFutureTask(closableRegistry);
+ keyedStateCheckpointClosingFuture = castAsKeyedStateHandle(asyncSnapshotTask);
+ }
+ return keyedStateCheckpointClosingFuture;
}
@Nonnull
public RunnableFuture<SnapshotResult<OperatorStateHandle>> getOperatorStateStreamFuture() throws IOException {
- OperatorStateHandle operatorStateHandle =
- closeAndUnregisterStreamToObtainStateHandle(operatorStateCheckpointOutputStream);
- return toDoneFutureOfSnapshotResult(operatorStateHandle);
- }
-
- private <T extends StateObject> RunnableFuture<SnapshotResult<T>> toDoneFutureOfSnapshotResult(T handle) {
- SnapshotResult<T> snapshotResult = SnapshotResult.of(handle);
- return DoneFuture.of(snapshotResult);
+ if (null == operatorStateCheckpointClosingFuture) {
+ StreamCloserCallable<OperatorStateHandle> callable = new StreamCloserCallable<>(closableRegistry, operatorStateCheckpointOutputStream);
+ operatorStateCheckpointClosingFuture = callable.toAsyncSnapshotFutureTask(closableRegistry);
+ }
+ return operatorStateCheckpointClosingFuture;
}
- private <T extends StreamStateHandle> T closeAndUnregisterStreamToObtainStateHandle(
- NonClosingCheckpointOutputStream<T> stream) throws IOException {
-
- if (stream != null && closableRegistry.unregisterCloseable(stream.getDelegate())) {
- return stream.closeAndGetHandle();
- } else {
- return null;
- }
+ @SuppressWarnings("unchecked")
+ private static RunnableFuture<SnapshotResult<KeyedStateHandle>> castAsKeyedStateHandle(RunnableFuture<?> asyncSnapshotTask) {
+ return (RunnableFuture<SnapshotResult<KeyedStateHandle>>) asyncSnapshotTask;
}
private <T extends StreamStateHandle> void closeAndUnregisterStream(
@@ -158,30 +156,70 @@ public class StateSnapshotContextSynchronousImpl implements StateSnapshotContext
}
}
- @Override
- public void close() throws IOException {
+ public void closeExceptionally() throws IOException {
IOException exception = null;
-
if (keyedStateCheckpointOutputStream != null) {
try {
closeAndUnregisterStream(keyedStateCheckpointOutputStream);
- } catch (IOException e) {
+ }
+ catch (IOException e) {
exception = new IOException("Could not close the raw keyed state checkpoint output stream.", e);
}
}
-
if (operatorStateCheckpointOutputStream != null) {
try {
closeAndUnregisterStream(operatorStateCheckpointOutputStream);
- } catch (IOException e) {
+ }
+ catch (IOException e) {
exception = ExceptionUtils.firstOrSuppressed(
new IOException("Could not close the raw operator state checkpoint output stream.", e),
exception);
}
}
-
+ if (keyedStateCheckpointClosingFuture != null) {
+ keyedStateCheckpointClosingFuture.cancel(true);
+ }
+ if (operatorStateCheckpointClosingFuture != null) {
+ operatorStateCheckpointClosingFuture.cancel(true);
+ }
if (exception != null) {
throw exception;
}
}
+
+ private static final class StreamCloserCallable<T extends StreamStateHandle> extends AsyncSnapshotCallable<SnapshotResult<T>> {
+
+ @Nullable
+ private final NonClosingCheckpointOutputStream<T> stream;
+ private final CloseableRegistry closableRegistry;
+
+ StreamCloserCallable(CloseableRegistry closableRegistry, @Nullable NonClosingCheckpointOutputStream<T> stream) {
+ this.closableRegistry = Preconditions.checkNotNull(closableRegistry);
+ this.stream = stream;
+ }
+
+ @Override
+ protected SnapshotResult<T> callInternal() throws Exception {
+ if (stream == null) {
+ return SnapshotResult.of(null);
+ }
+ if (!closableRegistry.unregisterCloseable(stream.getDelegate())) {
+ throw new IOException("Stream delegate appears to be closed, because it is no longer registered.");
+ }
+ T closed = stream.closeAndGetHandle();
+ return SnapshotResult.of(closed);
+ }
+
+ @Override
+ protected void cleanupProvidedResources() {
+ try {
+ if (stream != null && closableRegistry.unregisterCloseable(stream.getDelegate())) {
+ stream.closeAndGetHandle();
+ }
+ }
+ catch (IOException e) {
+ throw new IllegalStateException("Unable to cleanup a stream.", e);
+ }
+ }
+ }
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 7d2eda5..2e4310a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -73,6 +73,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
+import java.io.IOException;
import java.io.Serializable;
import java.util.Locale;
@@ -387,13 +388,14 @@ public abstract class AbstractStreamOperator<OUT>
OperatorSnapshotFutures snapshotInProgress = new OperatorSnapshotFutures();
- try (StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(
- checkpointId,
- timestamp,
- factory,
- keyGroupRange,
- getContainingTask().getCancelables())) {
+ StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(
+ checkpointId,
+ timestamp,
+ factory,
+ keyGroupRange,
+ getContainingTask().getCancelables());
+ try {
snapshotState(snapshotContext);
snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
@@ -421,6 +423,11 @@ public abstract class AbstractStreamOperator<OUT>
if (!getContainingTask().isCanceled()) {
LOG.info(snapshotFailMessage, snapshotException);
}
+ try {
+ snapshotContext.closeExceptionally();
+ } catch (IOException e) {
+ snapshotException.addSuppressed(e);
+ }
throw new CheckpointException(snapshotFailMessage, CheckpointFailureReason.CHECKPOINT_DECLINED, snapshotException);
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
index 28e0ac4..36fb867 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
@@ -521,7 +521,6 @@ public class AbstractStreamOperatorTest {
CheckpointOptions.forCheckpointWithDefaultLocation(),
new MemCheckpointStreamFactory(Integer.MAX_VALUE));
- verify(context).close();
}
/**
@@ -561,8 +560,6 @@ public class AbstractStreamOperatorTest {
} catch (Exception e) {
assertEquals(failingException, e.getCause());
}
-
- verify(context).close();
}
/**
@@ -644,7 +641,6 @@ public class AbstractStreamOperatorTest {
// verify that the context has been closed, the operator snapshot result has been cancelled
// and that all futures have been cancelled.
- verify(context).close();
verify(operatorSnapshotResult).cancel();
verify(futureKeyedStateHandle).cancel(anyBoolean());
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java
index 3a8250f..9c8c7ff 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java
@@ -110,7 +110,48 @@ public class StateSnapshotContextSynchronousImplTest extends TestLogger {
assertTrue(closableRegistry.contains(outputStream1));
assertTrue(closableRegistry.contains(outputStream2));
- context.close();
+ context.getKeyedStateStreamFuture().run();
+ context.getOperatorStateStreamFuture().run();
+
+ verify(outputStream1).closeAndGetHandle();
+ verify(outputStream2).closeAndGetHandle();
+
+ assertEquals(0, closableRegistry.size());
+ }
+
+ @Test
+ public void testStreamClosingExceptionally() throws Exception {
+ long checkpointId = 42L;
+ long checkpointTimestamp = 1L;
+
+ CheckpointStreamFactory.CheckpointStateOutputStream outputStream1 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
+ CheckpointStreamFactory.CheckpointStateOutputStream outputStream2 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
+
+ CheckpointStreamFactory streamFactory = mock(CheckpointStreamFactory.class);
+ when(streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE)).thenReturn(outputStream1, outputStream2);
+
+ InsightCloseableRegistry closableRegistry = new InsightCloseableRegistry();
+
+ KeyGroupRange keyGroupRange = new KeyGroupRange(0, 2);
+
+ StateSnapshotContextSynchronousImpl context = new StateSnapshotContextSynchronousImpl(
+ checkpointId,
+ checkpointTimestamp,
+ streamFactory,
+ keyGroupRange,
+ closableRegistry);
+
+ // creating the output streams
+ context.getRawKeyedOperatorStateOutput();
+ context.getRawOperatorStateOutput();
+
+ verify(streamFactory, times(2)).createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
+
+ assertEquals(2, closableRegistry.size());
+ assertTrue(closableRegistry.contains(outputStream1));
+ assertTrue(closableRegistry.contains(outputStream2));
+
+ context.closeExceptionally();
verify(outputStream1).close();
verify(outputStream2).close();