You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/01/23 16:37:31 UTC
[1/2] flink git commit: [FLINK-5214] Clean up checkpoint data in case
of a failing checkpoint operation
Repository: flink
Updated Branches:
refs/heads/release-1.2 e0a784197 -> 840b779c5
[FLINK-5214] Clean up checkpoint data in case of a failing checkpoint operation
Adds exception handling to the stream operators for the snapshotState method. A failing
snapshot operation will trigger the clean up of all so far generated state resources.
This will avoid that in case of a failing snapshot operation resources (e.g. files) are
left behind.
Add test case for OperatorSnapshotResult
Add StateSnapshotContextSynchronousImplTest
Add AbstractStreamOperator failing snapshot tests
This closes #3178.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/006fcc44
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/006fcc44
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/006fcc44
Branch: refs/heads/release-1.2
Commit: 006fcc4443f837d1384b8e85f9fb6dd048d2f743
Parents: e0a78419
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Dec 1 13:25:05 2016 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Jan 23 17:36:47 2017 +0100
----------------------------------------------------------------------
.../state/RocksDBKeyedStateBackend.java | 12 +-
.../state/RocksDBAsyncSnapshotTest.java | 77 ++++++++-
.../runtime/state/StateSnapshotContext.java | 2 +-
.../StateSnapshotContextSynchronousImpl.java | 41 ++++-
.../filesystem/FsCheckpointStreamFactory.java | 67 +++++---
.../FsCheckpointStateOutputStreamTest.java | 73 +++++++++
.../source/ContinuousFileReaderOperator.java | 15 +-
.../functions/util/StreamingFunctionUtils.java | 11 +-
.../api/operators/AbstractStreamOperator.java | 92 ++++++++---
.../api/operators/OperatorSnapshotResult.java | 56 ++++++-
.../api/operators/async/AsyncWaitOperator.java | 21 ++-
.../operators/GenericWriteAheadSink.java | 14 +-
.../streaming/runtime/tasks/StreamTask.java | 42 +++--
.../operators/AbstractStreamOperatorTest.java | 157 ++++++++++++++++++-
.../operators/OperatorSnapshotResultTest.java | 97 +++++-------
...StateSnapshotContextSynchronousImplTest.java | 73 ++++++++-
16 files changed, 698 insertions(+), 152 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/006fcc44/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index b207af6..6587ca5 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -284,8 +284,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
}
- LOG.info("Asynchronous RocksDB snapshot (" + streamFactory + ", asynchronous part) in thread " +
- Thread.currentThread() + " took " + (System.currentTimeMillis() - startTime) + " ms.");
+ LOG.info("Asynchronous RocksDB snapshot ({}, asynchronous part) in thread {} took {} ms.",
+ streamFactory, Thread.currentThread(), (System.currentTimeMillis() - startTime));
return snapshotOperation.getSnapshotResultStateHandle();
}
@@ -346,7 +346,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
* @param checkpointId id of the checkpoint for which we take the snapshot
* @param checkpointTimeStamp timestamp of the checkpoint for which we take the snapshot
*/
- public void takeDBSnapShot(long checkpointId, long checkpointTimeStamp) throws IOException {
+ public void takeDBSnapShot(long checkpointId, long checkpointTimeStamp) {
Preconditions.checkArgument(snapshot == null, "Only one ongoing snapshot allowed!");
this.kvStateIterators = new ArrayList<>(stateBackend.kvStateInformation.size());
this.checkpointId = checkpointId;
@@ -427,8 +427,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
if (null != snapshotResultStateHandle) {
snapshotResultStateHandle.discardState();
}
- } catch (Exception ignored) {
- LOG.warn("Exception occurred during snapshot state handle cleanup: " + ignored);
+ } catch (Exception e) {
+ LOG.warn("Exception occurred during snapshot state handle cleanup.", e);
}
}
}
@@ -452,7 +452,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
return snapshotResultStateHandle;
}
- private void writeKVStateMetaData() throws IOException, InterruptedException {
+ private void writeKVStateMetaData() throws IOException {
List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> metaInfoList =
new ArrayList<>(stateBackend.kvStateInformation.size());
http://git-wip-us.apache.org/repos/asf/flink/blob/006fcc44/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index 70f74b0..46a184a 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.core.fs.FSDataInputStream;
@@ -31,8 +32,14 @@ import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.SubtaskState;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
@@ -47,6 +54,7 @@ import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.FutureUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -69,10 +77,21 @@ import java.net.URI;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
/**
* Tests for asynchronous RocksDB Key/Value state checkpoints.
@@ -182,7 +201,7 @@ public class RocksDBAsyncSnapshotTest {
testHarness.waitForTaskCompletion();
if (mockEnv.wasFailedExternally()) {
- Assert.fail("Unexpected exception during execution.");
+ fail("Unexpected exception during execution.");
}
}
@@ -259,7 +278,7 @@ public class RocksDBAsyncSnapshotTest {
if (mockEnv.wasFailedExternally()) {
throw new AsynchronousException(new InterruptedException("Exception was thrown as expected."));
}
- Assert.fail("Operation completed. Cancel failed.");
+ fail("Operation completed. Cancel failed.");
} catch (Exception expected) {
AsynchronousException asynchronousException = null;
@@ -268,7 +287,7 @@ public class RocksDBAsyncSnapshotTest {
} else if (expected.getCause() instanceof AsynchronousException) {
asynchronousException = (AsynchronousException) expected.getCause();
} else {
- Assert.fail("Unexpected exception: " + expected);
+ fail("Unexpected exception: " + expected);
}
// we expect the exception from canceling snapshots
@@ -279,6 +298,58 @@ public class RocksDBAsyncSnapshotTest {
}
}
+ /**
+ * Test that the snapshot files are cleaned up in case of a failure during the snapshot
+ * procedure.
+ */
+ @Test
+ public void testCleanupOfSnapshotsInFailureCase() throws Exception {
+ long checkpointId = 1L;
+ long timestamp = 42L;
+
+ Environment env = new DummyEnvironment("test task", 1, 0);
+
+ CheckpointStreamFactory.CheckpointStateOutputStream outputStream = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
+ CheckpointStreamFactory checkpointStreamFactory = mock(CheckpointStreamFactory.class);
+ AbstractStateBackend stateBackend = mock(AbstractStateBackend.class);
+
+ final IOException testException = new IOException("Test exception");
+
+ doReturn(checkpointStreamFactory).when(stateBackend).createStreamFactory(any(JobID.class), anyString());
+ doThrow(testException).when(outputStream).write(anyInt());
+ doReturn(outputStream).when(checkpointStreamFactory).createCheckpointStateOutputStream(eq(checkpointId), eq(timestamp));
+
+ RocksDBStateBackend backend = new RocksDBStateBackend(stateBackend);
+
+ backend.setDbStoragePath("file:///tmp/foobar");
+
+ AbstractKeyedStateBackend<Void> keyedStateBackend = backend.createKeyedStateBackend(
+ env,
+ new JobID(),
+ "test operator",
+ VoidSerializer.INSTANCE,
+ 1,
+ new KeyGroupRange(0, 0),
+ null);
+
+ // register a state so that the state backend has to checkpoint something
+ keyedStateBackend.getPartitionedState(
+ "namespace",
+ StringSerializer.INSTANCE,
+ new ValueStateDescriptor<>("foobar", String.class));
+
+ RunnableFuture<KeyGroupsStateHandle> snapshotFuture = keyedStateBackend.snapshot(checkpointId, timestamp, checkpointStreamFactory);
+
+ try {
+ FutureUtil.runIfNotDoneAndGet(snapshotFuture);
+ fail("Expected an exception to be thrown here.");
+ } catch (ExecutionException e) {
+ Assert.assertEquals(testException, e.getCause());
+ }
+
+ verify(outputStream).close();
+ }
+
@Test
public void testConsistentSnapshotSerializationFlagsAndMasks() {
http://git-wip-us.apache.org/repos/asf/flink/blob/006fcc44/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContext.java
index 4dbbeaf..e5a748b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContext.java
@@ -37,4 +37,4 @@ public interface StateSnapshotContext extends FunctionSnapshotContext {
*/
OperatorStateCheckpointOutputStream getRawOperatorStateOutput() throws Exception;
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/006fcc44/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
index ce8a6c4..96edccb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
@@ -20,15 +20,17 @@ package org.apache.flink.runtime.state;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
+import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.RunnableFuture;
/**
* This class is a default implementation for StateSnapshotContext.
*/
-public class StateSnapshotContextSynchronousImpl implements StateSnapshotContext {
+public class StateSnapshotContextSynchronousImpl implements StateSnapshotContext, Closeable {
private final long checkpointId;
private final long checkpointTimestamp;
@@ -127,4 +129,39 @@ public class StateSnapshotContextSynchronousImpl implements StateSnapshotContext
return new DoneFuture<>(stream.closeAndGetHandle());
}
-}
\ No newline at end of file
+ private <T extends StreamStateHandle> void closeAndUnregisterStream(NonClosingCheckpointOutputStream<T> stream) throws IOException {
+ Preconditions.checkNotNull(stream);
+
+ closableRegistry.unregisterClosable(stream.getDelegate());
+ stream.getDelegate().close();
+ }
+
+ @Override
+ public void close() throws IOException {
+ IOException exception = null;
+
+ if (keyedStateCheckpointOutputStream != null) {
+ try {
+ closeAndUnregisterStream(keyedStateCheckpointOutputStream);
+ } catch (IOException e) {
+ exception = ExceptionUtils.firstOrSuppressed(
+ new IOException("Could not close the raw keyed state checkpoint output stream.", e),
+ exception);
+ }
+ }
+
+ if (operatorStateCheckpointOutputStream != null) {
+ try {
+ closeAndUnregisterStream(operatorStateCheckpointOutputStream);
+ } catch (IOException e) {
+ exception = ExceptionUtils.firstOrSuppressed(
+ new IOException("Could not close the raw operator state checkpoint output stream.", e),
+ exception);
+ }
+ }
+
+ if (exception != null) {
+ throw exception;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/006fcc44/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
index 1be3abf..30b1da6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
@@ -266,17 +266,21 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
if (outStream != null) {
try {
outStream.close();
- fs.delete(statePath, false);
-
+ } catch (Throwable throwable) {
+ LOG.warn("Could not close the state stream for {}.", statePath, throwable);
+ } finally {
try {
- FileUtils.deletePathIfEmpty(fs, basePath);
- } catch (Exception ignored) {
- LOG.debug("Could not delete the parent directory {}.", basePath, ignored);
+ fs.delete(statePath, false);
+
+ try {
+ FileUtils.deletePathIfEmpty(fs, basePath);
+ } catch (Exception ignored) {
+ LOG.debug("Could not delete the parent directory {}.", basePath, ignored);
+ }
+ } catch (Exception e) {
+ LOG.warn("Cannot delete closed and discarded state stream for {}.", statePath, e);
}
}
- catch (Exception e) {
- LOG.warn("Cannot delete closed and discarded state stream for " + statePath, e);
- }
}
}
}
@@ -297,20 +301,41 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
return new ByteStreamStateHandle(createStatePath().toString(), bytes);
}
else {
- flush();
-
- closed = true;
- pos = writeBuffer.length;
-
- long size = -1;
- // make a best effort attempt to figure out the size
try {
- size = outStream.getPos();
- } catch (Exception ignored) {}
-
- outStream.close();
-
- return new FileStateHandle(statePath, size);
+ flush();
+
+ pos = writeBuffer.length;
+
+ long size = -1L;
+
+ // make a best effort attempt to figure out the size
+ try {
+ size = outStream.getPos();
+ } catch (Exception ignored) {}
+
+ outStream.close();
+
+ return new FileStateHandle(statePath, size);
+ } catch (Exception exception) {
+ try {
+ fs.delete(statePath, false);
+
+ try {
+ FileUtils.deletePathIfEmpty(fs, basePath);
+ } catch (Exception parentDirDeletionFailure) {
+ LOG.debug("Could not delete the parent directory {}.", basePath, parentDirDeletionFailure);
+ }
+ } catch (Exception deleteException) {
+ LOG.warn("Could not delete the checkpoint stream file {}.",
+ statePath, deleteException);
+ }
+
+ throw new IOException("Could not flush and close the file system " +
+ "output stream to " + statePath + " in order to obtain the " +
+ "stream state handle", exception);
+ } finally {
+ closed = true;
+ }
}
}
else {
http://git-wip-us.apache.org/repos/asf/flink/blob/006fcc44/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
index 6d371b1..8617193 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
@@ -18,14 +18,17 @@
package org.apache.flink.runtime.state.filesystem;
+import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.junit.Assert;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
import java.io.DataInputStream;
import java.io.File;
@@ -37,6 +40,13 @@ import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
public class FsCheckpointStateOutputStreamTest {
@@ -108,6 +118,69 @@ public class FsCheckpointStateOutputStreamTest {
stream.closeAndGetHandle();
}
+ /**
+ * Tests that the underlying stream file is deleted upon calling close.
+ */
+ @Test
+ public void testCleanupWhenClosingStream() throws IOException {
+
+ final FileSystem fs = mock(FileSystem.class);
+ final FSDataOutputStream outputStream = mock(FSDataOutputStream.class);
+
+ final ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);
+
+ when(fs.create(pathCaptor.capture(), anyBoolean())).thenReturn(outputStream);
+
+ CheckpointStreamFactory.CheckpointStateOutputStream stream = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(
+ TEMP_DIR_PATH,
+ fs,
+ 4,
+ 0);
+
+ // this should create the underlying file stream
+ stream.write(new byte[]{1,2,3,4,5});
+
+ verify(fs).create(any(Path.class), anyBoolean());
+
+ stream.close();
+
+ verify(fs).delete(eq(pathCaptor.getValue()), anyBoolean());
+ }
+
+ /**
+ * Tests that the underlying stream file is deleted if the closeAndGetHandle method fails.
+ */
+ @Test
+ public void testCleanupWhenFailingCloseAndGetHandle() throws IOException {
+ final FileSystem fs = mock(FileSystem.class);
+ final FSDataOutputStream outputStream = mock(FSDataOutputStream.class);
+
+ final ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);
+
+ when(fs.create(pathCaptor.capture(), anyBoolean())).thenReturn(outputStream);
+ doThrow(new IOException("Test IOException.")).when(outputStream).close();
+
+ CheckpointStreamFactory.CheckpointStateOutputStream stream = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(
+ TEMP_DIR_PATH,
+ fs,
+ 4,
+ 0);
+
+ // this should create the underlying file stream
+ stream.write(new byte[]{1,2,3,4,5});
+
+ verify(fs).create(any(Path.class), anyBoolean());
+
+ try {
+ stream.closeAndGetHandle();
+ fail("Expected IOException");
+ } catch (IOException ioE) {
+ // expected exception
+ }
+
+ verify(fs).delete(eq(pathCaptor.getValue()), anyBoolean());
+ }
+
private void runTest(int numBytes, int bufferSize, int threshold, boolean expectFile) throws Exception {
FsCheckpointStreamFactory.CheckpointStateOutputStream stream =
new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(
http://git-wip-us.apache.org/repos/asf/flink/blob/006fcc44/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index 6419aa6..ab1ad1d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -394,10 +394,19 @@ public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OU
int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
checkpointedState.clear();
+
List<TimestampedFileInputSplit> readerState = reader.getReaderState();
- for (TimestampedFileInputSplit split : readerState) {
- // create a new partition for each entry.
- checkpointedState.add(split);
+
+ try {
+ for (TimestampedFileInputSplit split : readerState) {
+ // create a new partition for each entry.
+ checkpointedState.add(split);
+ }
+ } catch (Exception e) {
+ checkpointedState.clear();
+
+ throw new Exception("Could not add timestamped file input splits to to operator " +
+ "state backend of operator " + getOperatorName() + '.', e);
}
if (LOG.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/flink/blob/006fcc44/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/util/StreamingFunctionUtils.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/util/StreamingFunctionUtils.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/util/StreamingFunctionUtils.java
index d1d264f..679ef0b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/util/StreamingFunctionUtils.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/util/StreamingFunctionUtils.java
@@ -131,8 +131,15 @@ public final class StreamingFunctionUtils {
listState.clear();
if (null != partitionableState) {
- for (Serializable statePartition : partitionableState) {
- listState.add(statePartition);
+ try {
+ for (Serializable statePartition : partitionableState) {
+ listState.add(statePartition);
+ }
+ } catch (Exception e) {
+ listState.clear();
+
+ throw new Exception("Could not write partitionable state to operator " +
+ "state backend.", e);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/006fcc44/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index a21660c..16fe2c1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -339,24 +339,38 @@ public abstract class AbstractStreamOperator<OUT>
KeyGroupRange keyGroupRange = null != keyedStateBackend ?
keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;
- StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(
- checkpointId, timestamp, streamFactory, keyGroupRange, getContainingTask().getCancelables());
+ OperatorSnapshotResult snapshotInProgress = new OperatorSnapshotResult();
- snapshotState(snapshotContext);
+ try (StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(
+ checkpointId,
+ timestamp,
+ streamFactory,
+ keyGroupRange,
+ getContainingTask().getCancelables())) {
- OperatorSnapshotResult snapshotInProgress = new OperatorSnapshotResult();
+ snapshotState(snapshotContext);
- snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
- snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());
+ snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
+ snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());
- if (null != operatorStateBackend) {
- snapshotInProgress.setOperatorStateManagedFuture(
+ if (null != operatorStateBackend) {
+ snapshotInProgress.setOperatorStateManagedFuture(
operatorStateBackend.snapshot(checkpointId, timestamp, streamFactory));
- }
+ }
- if (null != keyedStateBackend) {
- snapshotInProgress.setKeyedStateManagedFuture(
+ if (null != keyedStateBackend) {
+ snapshotInProgress.setKeyedStateManagedFuture(
keyedStateBackend.snapshot(checkpointId, timestamp, streamFactory));
+ }
+ } catch (Exception snapshotException) {
+ try {
+ snapshotInProgress.cancel();
+ } catch (Exception e) {
+ snapshotException.addSuppressed(e);
+ }
+
+ throw new Exception("Could not complete snapshot " + checkpointId + " for operator " +
+ getOperatorName() + '.', snapshotException);
}
return snapshotInProgress;
@@ -369,21 +383,40 @@ public abstract class AbstractStreamOperator<OUT>
*/
public void snapshotState(StateSnapshotContext context) throws Exception {
if (getKeyedStateBackend() != null) {
- KeyedStateCheckpointOutputStream out = context.getRawKeyedOperatorStateOutput();
+ KeyedStateCheckpointOutputStream out;
- KeyGroupsList allKeyGroups = out.getKeyGroupList();
- for (int keyGroupIdx : allKeyGroups) {
- out.startNewKeyGroup(keyGroupIdx);
+ try {
+ out = context.getRawKeyedOperatorStateOutput();
+ } catch (Exception exception) {
+ throw new Exception("Could not open raw keyed operator state stream for " +
+ getOperatorName() + '.', exception);
+ }
- DataOutputViewStreamWrapper dov = new DataOutputViewStreamWrapper(out);
- dov.writeInt(timerServices.size());
+ try {
+ KeyGroupsList allKeyGroups = out.getKeyGroupList();
+ for (int keyGroupIdx : allKeyGroups) {
+ out.startNewKeyGroup(keyGroupIdx);
- for (Map.Entry<String, HeapInternalTimerService<?, ?>> entry : timerServices.entrySet()) {
- String serviceName = entry.getKey();
- HeapInternalTimerService<?, ?> timerService = entry.getValue();
+ DataOutputViewStreamWrapper dov = new DataOutputViewStreamWrapper(out);
+ dov.writeInt(timerServices.size());
- dov.writeUTF(serviceName);
- timerService.snapshotTimersForKeyGroup(dov, keyGroupIdx);
+ for (Map.Entry<String, HeapInternalTimerService<?, ?>> entry : timerServices.entrySet()) {
+ String serviceName = entry.getKey();
+ HeapInternalTimerService<?, ?> timerService = entry.getValue();
+
+ dov.writeUTF(serviceName);
+ timerService.snapshotTimersForKeyGroup(dov, keyGroupIdx);
+ }
+ }
+ } catch (Exception exception) {
+ throw new Exception("Could not write timer service of " + getOperatorName() +
+ " to checkpoint state stream.", exception);
+ } finally {
+ try {
+ out.close();
+ } catch (Exception closeException) {
+ LOG.warn("Could not close raw keyed operator state stream for {}. This " +
+ "might have prevented deleting some state data.", getOperatorName(), closeException);
}
}
}
@@ -457,6 +490,21 @@ public abstract class AbstractStreamOperator<OUT>
public ClassLoader getUserCodeClassloader() {
return container.getUserCodeClassLoader();
}
+
+ /**
+ * Return the operator name. If the runtime context has been set, then the task name with
+ * subtask index is returned. Otherwise, the simple class name is returned.
+ *
+ * @return If runtime context is set, then return task name with subtask index. Otherwise return
+ * simple class name.
+ */
+ protected String getOperatorName() {
+ if (runtimeContext != null) {
+ return runtimeContext.getTaskNameWithSubtasks();
+ } else {
+ return getClass().getSimpleName();
+ }
+ }
/**
* Returns a context that allows the operator to query information about the execution and also
http://git-wip-us.apache.org/repos/asf/flink/blob/006fcc44/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
index 8b9f758..913928f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
@@ -20,8 +20,10 @@ package org.apache.flink.streaming.api.operators;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FutureUtil;
-import java.util.concurrent.Future;
import java.util.concurrent.RunnableFuture;
/**
@@ -81,16 +83,54 @@ public class OperatorSnapshotResult {
this.operatorStateRawFuture = operatorStateRawFuture;
}
- public void cancel() {
- cancelIfNotNull(getKeyedStateManagedFuture());
- cancelIfNotNull(getOperatorStateManagedFuture());
- cancelIfNotNull(getKeyedStateRawFuture());
- cancelIfNotNull(getOperatorStateRawFuture());
+ public void cancel() throws Exception {
+ Exception exception = null;
+
+ try {
+ cancelIfNotNull(getKeyedStateManagedFuture());
+ } catch (Exception e) {
+ exception = ExceptionUtils.firstOrSuppressed(
+ new Exception("Could not properly cancel managed keyed state future.", e),
+ exception);
+ }
+
+ try {
+ cancelIfNotNull(getOperatorStateManagedFuture());
+ } catch (Exception e) {
+ exception = ExceptionUtils.firstOrSuppressed(
+ new Exception("Could not properly cancel managed operator state future.", e),
+ exception);
+ }
+
+ try {
+ cancelIfNotNull(getKeyedStateRawFuture());
+ } catch (Exception e) {
+ exception = ExceptionUtils.firstOrSuppressed(
+ new Exception("Could not properly cancel raw keyed state future.", e),
+ exception);
+ }
+
+ try {
+ cancelIfNotNull(getOperatorStateRawFuture());
+ } catch (Exception e) {
+ exception = ExceptionUtils.firstOrSuppressed(
+ new Exception("Could not properly cancel raw operator state future.", e),
+ exception);
+ }
+
+ if (exception != null) {
+ throw exception;
+ }
}
- private static void cancelIfNotNull(Future<?> future) {
+ private static <T extends StreamStateHandle> void cancelIfNotNull(RunnableFuture<T> future) throws Exception {
if (null != future) {
- future.cancel(true);
+ if (!future.cancel(true)) {
+ // the cancellation was not successful because it might have been completed before
+ StreamStateHandle streamStateHandle = FutureUtil.runIfNotDoneAndGet(future);
+
+ streamStateHandle.discardState();
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/006fcc44/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
index 754b754..f43f8b9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
@@ -231,18 +231,25 @@ public class AsyncWaitOperator<IN, OUT>
super.snapshotState(context);
ListState<StreamElement> partitionableState =
- getOperatorStateBackend().getOperatorState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer));
+ getOperatorStateBackend().getOperatorState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer));
partitionableState.clear();
Collection<StreamElementQueueEntry<?>> values = queue.values();
- for (StreamElementQueueEntry<?> value : values) {
- partitionableState.add(value.getStreamElement());
- }
+ try {
+ for (StreamElementQueueEntry<?> value : values) {
+ partitionableState.add(value.getStreamElement());
+ }
+
+ // add the pending stream element queue entry if the stream element queue is currently full
+ if (pendingStreamElementQueueEntry != null) {
+ partitionableState.add(pendingStreamElementQueueEntry.getStreamElement());
+ }
+ } catch (Exception e) {
+ partitionableState.clear();
- // add the pending stream element queue entry if the stream element queue is currently full
- if (pendingStreamElementQueueEntry != null) {
- partitionableState.add(pendingStreamElementQueueEntry.getStreamElement());
+ throw new Exception("Could not add stream element queue entries to operator state " +
+ "backend of operator " + getOperatorName() + '.', e);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/006fcc44/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
index 564fa22..7a571ec 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
@@ -162,9 +162,17 @@ public abstract class GenericWriteAheadSink<IN> extends AbstractStreamOperator<I
saveHandleInState(context.getCheckpointId(), context.getCheckpointTimestamp());
this.checkpointedState.clear();
- for (PendingCheckpoint pendingCheckpoint : pendingCheckpoints) {
- // create a new partition for each entry.
- this.checkpointedState.add(pendingCheckpoint);
+
+ try {
+ for (PendingCheckpoint pendingCheckpoint : pendingCheckpoints) {
+ // create a new partition for each entry.
+ this.checkpointedState.add(pendingCheckpoint);
+ }
+ } catch (Exception e) {
+ checkpointedState.clear();
+
+ throw new Exception("Could not add panding checkpoints to operator state " +
+ "backend of operator " + getOperatorName() + '.', e);
}
int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
http://git-wip-us.apache.org/repos/asf/flink/blob/006fcc44/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 530401b..775475d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -57,6 +57,7 @@ import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FutureUtil;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
@@ -590,8 +591,20 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
// we cannot broadcast the cancellation markers on the 'operator chain', because it may not
// yet be created
final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());
+ Exception exception = null;
+
for (ResultPartitionWriter output : getEnvironment().getAllWriters()) {
- output.writeEventToAllChannels(message);
+ try {
+ output.writeEventToAllChannels(message);
+ } catch (Exception e) {
+ exception = ExceptionUtils.firstOrSuppressed(
+ new Exception("Could not send cancel checkpoint marker to downstream tasks.", e),
+ exception);
+ }
+ }
+
+ if (exception != null) {
+ throw exception;
}
return false;
@@ -957,7 +970,12 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
// cleanup/release ongoing snapshot operations
for (OperatorSnapshotResult snapshotResult : snapshotInProgressList) {
if (null != snapshotResult) {
- snapshotResult.cancel();
+ try {
+ snapshotResult.cancel();
+ } catch (Exception e) {
+ LOG.warn("Could not properly cancel operator snapshot result in async " +
+ "checkpoint runnable.", e);
+ }
}
}
}
@@ -1021,24 +1039,30 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
if (LOG.isDebugEnabled()) {
LOG.debug("{} - finished synchronous part of checkpoint {}." +
- "Alignment duration: {} ms, snapshot duration {} ms",
- owner.getName(), checkpointMetaData.getCheckpointId(),
- checkpointMetaData.getAlignmentDurationNanos() / 1_000_000,
- checkpointMetaData.getSyncDurationMillis());
+ "Alignment duration: {} ms, snapshot duration {} ms",
+ owner.getName(), checkpointMetaData.getCheckpointId(),
+ checkpointMetaData.getAlignmentDurationNanos() / 1_000_000,
+ checkpointMetaData.getSyncDurationMillis());
}
} finally {
if (failed) {
// Cleanup to release resources
for (OperatorSnapshotResult operatorSnapshotResult : snapshotInProgressList) {
if (null != operatorSnapshotResult) {
- operatorSnapshotResult.cancel();
+ try {
+ operatorSnapshotResult.cancel();
+ } catch (Exception e) {
+ LOG.warn("Could not properly cancel an operator snapshot result.", e);
+ }
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("{} - did NOT finish synchronous part of checkpoint {}." +
- "Alignment duration: {} ms, snapshot duration {} ms",
- owner.getName(), checkpointMetaData.getCheckpointId());
+ "Alignment duration: {} ms, snapshot duration {} ms",
+ owner.getName(), checkpointMetaData.getCheckpointId(),
+ checkpointMetaData.getAlignmentDurationNanos() / 1_000_000,
+ checkpointMetaData.getSyncDurationMillis());
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/006fcc44/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
index f4051c9..409a732 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
@@ -19,35 +19,61 @@ package org.apache.flink.streaming.api.operators;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.internal.util.reflection.Whitebox;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
+import java.util.concurrent.RunnableFuture;
import static junit.framework.TestCase.assertTrue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
-
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.doReturn;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.spy;
+import static org.powermock.api.mockito.PowerMockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
/**
* Tests for the facilities provided by {@link AbstractStreamOperator}. This mostly
* tests timers and state and whether they are correctly checkpointed/restored
* with key-group reshuffling.
*/
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(AbstractStreamOperator.class)
public class AbstractStreamOperatorTest {
@Test
@@ -453,6 +479,133 @@ public class AbstractStreamOperatorTest {
}
/**
+ * Checks that the state snapshot context is closed after a successful snapshot operation.
+ */
+ @Test
+ public void testSnapshotMethod() throws Exception {
+ final long checkpointId = 42L;
+ final long timestamp = 1L;
+
+ final CloseableRegistry closeableRegistry = new CloseableRegistry();
+
+ StateSnapshotContextSynchronousImpl context = mock(StateSnapshotContextSynchronousImpl.class);
+
+ whenNew(StateSnapshotContextSynchronousImpl.class).withAnyArguments().thenReturn(context);
+
+ CheckpointStreamFactory streamFactory = mock(CheckpointStreamFactory.class);
+ StreamTask<Void, AbstractStreamOperator<Void>> containingTask = mock(StreamTask.class);
+ when(containingTask.getCancelables()).thenReturn(closeableRegistry);
+
+ AbstractStreamOperator<Void> operator = mock(AbstractStreamOperator.class);
+ when(operator.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenCallRealMethod();
+ doReturn(containingTask).when(operator).getContainingTask();
+
+ operator.snapshotState(checkpointId, timestamp, streamFactory);
+
+ verify(context).close();
+ }
+
+ /**
+ * Tests that the created StateSnapshotContextSynchronousImpl is closed in case of a failing
+ * Operator#snapshotState(StaetSnapshotContextSynchronousImpl) call.
+ */
+ @Test
+ public void testFailingSnapshotMethod() throws Exception {
+ final long checkpointId = 42L;
+ final long timestamp = 1L;
+
+ final Exception failingException = new Exception("Test exception");
+
+ final CloseableRegistry closeableRegistry = new CloseableRegistry();
+
+ StateSnapshotContextSynchronousImpl context = mock(StateSnapshotContextSynchronousImpl.class);
+
+ whenNew(StateSnapshotContextSynchronousImpl.class).withAnyArguments().thenReturn(context);
+
+ CheckpointStreamFactory streamFactory = mock(CheckpointStreamFactory.class);
+ StreamTask<Void, AbstractStreamOperator<Void>> containingTask = mock(StreamTask.class);
+ when(containingTask.getCancelables()).thenReturn(closeableRegistry);
+
+ AbstractStreamOperator<Void> operator = mock(AbstractStreamOperator.class);
+ when(operator.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenCallRealMethod();
+ doReturn(containingTask).when(operator).getContainingTask();
+
+ // lets fail when calling the actual snapshotState method
+ doThrow(failingException).when(operator).snapshotState(eq(context));
+
+ try {
+ operator.snapshotState(checkpointId, timestamp, streamFactory);
+ fail("Exception expected.");
+ } catch (Exception e) {
+ assertEquals(failingException, e.getCause());
+ }
+
+ verify(context).close();
+ }
+
+ /**
+ * Tests that a failing snapshot method call to the keyed state backend will trigger the closing
+ * of the StateSnapshotContextSynchronousImpl and the cancellation of the
+ * OperatorSnapshotResult. The latter is supposed to also cancel all assigned futures.
+ */
+ @Test
+ public void testFailingBackendSnapshotMethod() throws Exception {
+ final long checkpointId = 42L;
+ final long timestamp = 1L;
+
+ final Exception failingException = new Exception("Test exception");
+
+ final CloseableRegistry closeableRegistry = new CloseableRegistry();
+
+ RunnableFuture<KeyGroupsStateHandle> futureKeyGroupStateHandle = mock(RunnableFuture.class);
+ RunnableFuture<OperatorStateHandle> futureOperatorStateHandle = mock(RunnableFuture.class);
+
+ StateSnapshotContextSynchronousImpl context = mock(StateSnapshotContextSynchronousImpl.class);
+ when(context.getKeyedStateStreamFuture()).thenReturn(futureKeyGroupStateHandle);
+ when(context.getOperatorStateStreamFuture()).thenReturn(futureOperatorStateHandle);
+
+ OperatorSnapshotResult operatorSnapshotResult = spy(new OperatorSnapshotResult());
+
+ whenNew(StateSnapshotContextSynchronousImpl.class).withAnyArguments().thenReturn(context);
+ whenNew(OperatorSnapshotResult.class).withAnyArguments().thenReturn(operatorSnapshotResult);
+
+ CheckpointStreamFactory streamFactory = mock(CheckpointStreamFactory.class);
+ StreamTask<Void, AbstractStreamOperator<Void>> containingTask = mock(StreamTask.class);
+ when(containingTask.getCancelables()).thenReturn(closeableRegistry);
+
+ AbstractStreamOperator<Void> operator = mock(AbstractStreamOperator.class);
+ when(operator.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenCallRealMethod();
+ doReturn(containingTask).when(operator).getContainingTask();
+
+ RunnableFuture<OperatorStateHandle> futureManagedOperatorStateHandle = mock(RunnableFuture.class);
+
+ OperatorStateBackend operatorStateBackend = mock(OperatorStateBackend.class);
+ when(operatorStateBackend.snapshot(eq(checkpointId), eq(timestamp), eq(streamFactory))).thenReturn(futureManagedOperatorStateHandle);
+
+ AbstractKeyedStateBackend<?> keyedStateBackend = mock(AbstractKeyedStateBackend.class);
+ when(keyedStateBackend.snapshot(eq(checkpointId), eq(timestamp), eq(streamFactory))).thenThrow(failingException);
+
+ Whitebox.setInternalState(operator, "operatorStateBackend", operatorStateBackend);
+ Whitebox.setInternalState(operator, "keyedStateBackend", keyedStateBackend);
+
+ try {
+ operator.snapshotState(checkpointId, timestamp, streamFactory);
+ fail("Exception expected.");
+ } catch (Exception e) {
+ assertEquals(failingException, e.getCause());
+ }
+
+ // verify that the context has been closed, the operator snapshot result has been cancelled
+ // and that all futures have been cancelled.
+ verify(context).close();
+ verify(operatorSnapshotResult).cancel();
+
+ verify(futureKeyGroupStateHandle).cancel(anyBoolean());
+ verify(futureOperatorStateHandle).cancel(anyBoolean());
+ verify(futureKeyGroupStateHandle).cancel(anyBoolean());
+ }
+
+ /**
* Extracts the result values form the test harness and clear the output queue.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
http://git-wip-us.apache.org/repos/asf/flink/blob/006fcc44/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java
index 7e0ce5b..490df52 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java
@@ -20,80 +20,59 @@ package org.apache.flink.streaming.api.operators;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.junit.Assert;
+import org.apache.flink.util.TestLogger;
import org.junit.Test;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.RunnableFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-public class OperatorSnapshotResultTest {
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.when;
+public class OperatorSnapshotResultTest extends TestLogger {
+
+ /**
+ * Tests that all runnable futures in an OperatorSnapshotResult are properly cancelled and if
+ * the StreamStateHandle result is retrievable that the state handle are discarded.
+ */
@Test
- public void testCancel() {
+ public void testCancelAndCleanup() throws Exception {
OperatorSnapshotResult operatorSnapshotResult = new OperatorSnapshotResult();
operatorSnapshotResult.cancel();
- RunnableFuture<KeyGroupsStateHandle> keyedStateManagedFuture = new TestRunnableFuture<>();
- RunnableFuture<KeyGroupsStateHandle> keyedStateRawFuture = new TestRunnableFuture<>();
- RunnableFuture<OperatorStateHandle> operatorStateManagedFuture = new TestRunnableFuture<>();
- RunnableFuture<OperatorStateHandle> operatorStateRawFuture = new TestRunnableFuture<>();
-
- operatorSnapshotResult = new OperatorSnapshotResult(
- keyedStateManagedFuture,
- keyedStateRawFuture,
- operatorStateManagedFuture,
- operatorStateRawFuture);
-
- operatorSnapshotResult.cancel();
-
- Assert.assertTrue(keyedStateManagedFuture.isCancelled());
- Assert.assertTrue(keyedStateRawFuture.isCancelled());
- Assert.assertTrue(operatorStateManagedFuture.isCancelled());
- Assert.assertTrue(operatorStateRawFuture.isCancelled());
-
- }
+ KeyGroupsStateHandle keyedManagedStateHandle = mock(KeyGroupsStateHandle.class);
+ RunnableFuture<KeyGroupsStateHandle> keyedStateManagedFuture = mock(RunnableFuture.class);
+ when(keyedStateManagedFuture.get()).thenReturn(keyedManagedStateHandle);
- static final class TestRunnableFuture<T> implements RunnableFuture<T> {
+ KeyGroupsStateHandle keyedRawStateHandle = mock(KeyGroupsStateHandle.class);
+ RunnableFuture<KeyGroupsStateHandle> keyedStateRawFuture = mock(RunnableFuture.class);
+ when(keyedStateRawFuture.get()).thenReturn(keyedRawStateHandle);
- private boolean canceled;
+ OperatorStateHandle operatorManagedStateHandle = mock(OperatorStateHandle.class);
+ RunnableFuture<OperatorStateHandle> operatorStateManagedFuture = mock(RunnableFuture.class);
+ when(operatorStateManagedFuture.get()).thenReturn(operatorManagedStateHandle);
- public TestRunnableFuture() {
- this.canceled = false;
- }
+ OperatorStateHandle operatorRawStateHandle = mock(OperatorStateHandle.class);
+ RunnableFuture<OperatorStateHandle> operatorStateRawFuture = mock(RunnableFuture.class);
+ when(operatorStateRawFuture.get()).thenReturn(operatorRawStateHandle);
- @Override
- public void run() {
-
- }
-
- @Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- return canceled = true;
- }
-
- @Override
- public boolean isCancelled() {
- return canceled;
- }
+ operatorSnapshotResult = new OperatorSnapshotResult(
+ keyedStateManagedFuture,
+ keyedStateRawFuture,
+ operatorStateManagedFuture,
+ operatorStateRawFuture);
- @Override
- public boolean isDone() {
- return false;
- }
+ operatorSnapshotResult.cancel();
- @Override
- public T get() throws InterruptedException, ExecutionException {
- return null;
- }
+ verify(keyedStateManagedFuture).cancel(true);
+ verify(keyedStateRawFuture).cancel(true);
+ verify(operatorStateManagedFuture).cancel(true);
+ verify(operatorStateRawFuture).cancel(true);
- @Override
- public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
- return null;
- }
+ verify(keyedManagedStateHandle).discardState();
+ verify(keyedRawStateHandle).discardState();
+ verify(operatorManagedStateHandle).discardState();
+ verify(operatorRawStateHandle).discardState();
}
-
-
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/006fcc44/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java
index 2b2df4c..277ced5 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java
@@ -25,11 +25,22 @@ import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
import org.apache.flink.runtime.state.OperatorStateCheckpointOutputStream;
import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
+import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-public class StateSnapshotContextSynchronousImplTest {
+import java.io.Closeable;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+public class StateSnapshotContextSynchronousImplTest extends TestLogger {
private StateSnapshotContextSynchronousImpl snapshotContext;
@@ -43,8 +54,8 @@ public class StateSnapshotContextSynchronousImplTest {
@Test
public void testMetaData() {
- Assert.assertEquals(42, snapshotContext.getCheckpointId());
- Assert.assertEquals(4711, snapshotContext.getCheckpointTimestamp());
+ assertEquals(42, snapshotContext.getCheckpointId());
+ assertEquals(4711, snapshotContext.getCheckpointTimestamp());
}
@Test
@@ -58,4 +69,58 @@ public class StateSnapshotContextSynchronousImplTest {
OperatorStateCheckpointOutputStream stream = snapshotContext.getRawOperatorStateOutput();
Assert.assertNotNull(stream);
}
-}
\ No newline at end of file
+
+ /**
+ * Tests that closing the StateSnapshotContextSynchronousImpl will also close the associated
+ * output streams.
+ */
+ @Test
+ public void testStreamClosingWhenClosing() throws Exception {
+ long checkpointId = 42L;
+ long checkpointTimestamp = 1L;
+
+ CheckpointStreamFactory.CheckpointStateOutputStream outputStream1 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
+ CheckpointStreamFactory.CheckpointStateOutputStream outputStream2 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
+
+ CheckpointStreamFactory streamFactory = mock(CheckpointStreamFactory.class);
+ when(streamFactory.createCheckpointStateOutputStream(eq(checkpointId), eq(checkpointTimestamp))).thenReturn(outputStream1, outputStream2);
+
+ InsightCloseableRegistry closableRegistry = new InsightCloseableRegistry();
+
+ KeyGroupRange keyGroupRange = new KeyGroupRange(0, 2);
+
+ StateSnapshotContextSynchronousImpl context = new StateSnapshotContextSynchronousImpl(
+ checkpointId,
+ checkpointTimestamp,
+ streamFactory,
+ keyGroupRange,
+ closableRegistry);
+
+ // creating the output streams
+ context.getRawKeyedOperatorStateOutput();
+ context.getRawOperatorStateOutput();
+
+ verify(streamFactory, times(2)).createCheckpointStateOutputStream(eq(checkpointId), eq(checkpointTimestamp));
+
+ assertEquals(2, closableRegistry.size());
+ assertTrue(closableRegistry.contains(outputStream1));
+ assertTrue(closableRegistry.contains(outputStream2));
+
+ context.close();
+
+ verify(outputStream1).close();
+ verify(outputStream2).close();
+
+ assertEquals(0, closableRegistry.size());
+ }
+
+ static final class InsightCloseableRegistry extends CloseableRegistry {
+ public int size() {
+ return closeableToRef.size();
+ }
+
+ public boolean contains(Closeable closeable) {
+ return closeableToRef.containsKey(closeable);
+ }
+ }
+}
[2/2] flink git commit: [FLINK-5229] [state] Cleanup of operator
snapshots if subsequent operator snapshots fail
Posted by tr...@apache.org.
[FLINK-5229] [state] Cleanup of operator snapshots if subsequent operator snapshots fail
This PR adds operator state cleanup to the StreamTask class. If a stream task contains multiple
stream operators, then every operator is checkpointed. In case that a snapshot operation fails
all state handles and OperatorSnapshotResults belonging to previous operators have to be freed.
Add test cases for failing checkpoint operations in StreamTask
Address PR comments
This closes #3183.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/840b779c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/840b779c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/840b779c
Branch: refs/heads/release-1.2
Commit: 840b779c542462cf7bd4bed40620dd68e90ec6bd
Parents: 006fcc4
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Jan 20 14:28:44 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Jan 23 17:36:55 2017 +0100
----------------------------------------------------------------------
.../apache/flink/runtime/state/StateUtil.java | 21 +++
.../api/operators/OperatorSnapshotResult.java | 22 +--
.../streaming/runtime/tasks/StreamTask.java | 56 +++++-
.../streaming/runtime/tasks/StreamTaskTest.java | 182 ++++++++++++++++++-
4 files changed, 251 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/840b779c/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
index a4799bf..19afdec 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
@@ -18,6 +18,10 @@
package org.apache.flink.runtime.state;
+import org.apache.flink.util.FutureUtil;
+
+import java.util.concurrent.RunnableFuture;
+
/**
* Helpers for {@link StateObject} related code.
*/
@@ -61,4 +65,21 @@ public class StateUtil {
}
}
}
+
+ /**
+ * Discards the given state future by first trying to cancel it. If this is not possible, then
+ * the state object contained in the future is calculated and afterwards discarded.
+ *
+ * @param stateFuture to be discarded
+ * @throws Exception if the discard operation failed
+ */
+ public static void discardStateFuture(RunnableFuture<? extends StateObject> stateFuture) throws Exception {
+ if (null != stateFuture) {
+ if (!stateFuture.cancel(true)) {
+ StateObject stateObject = FutureUtil.runIfNotDoneAndGet(stateFuture);
+
+ stateObject.discardState();
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/840b779c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
index 913928f..5a6c37b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
@@ -20,9 +20,8 @@ package org.apache.flink.streaming.api.operators;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.StateUtil;
import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.FutureUtil;
import java.util.concurrent.RunnableFuture;
@@ -87,7 +86,7 @@ public class OperatorSnapshotResult {
Exception exception = null;
try {
- cancelIfNotNull(getKeyedStateManagedFuture());
+ StateUtil.discardStateFuture(getKeyedStateManagedFuture());
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(
new Exception("Could not properly cancel managed keyed state future.", e),
@@ -95,7 +94,7 @@ public class OperatorSnapshotResult {
}
try {
- cancelIfNotNull(getOperatorStateManagedFuture());
+ StateUtil.discardStateFuture(getOperatorStateManagedFuture());
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(
new Exception("Could not properly cancel managed operator state future.", e),
@@ -103,7 +102,7 @@ public class OperatorSnapshotResult {
}
try {
- cancelIfNotNull(getKeyedStateRawFuture());
+ StateUtil.discardStateFuture(getKeyedStateRawFuture());
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(
new Exception("Could not properly cancel raw keyed state future.", e),
@@ -111,7 +110,7 @@ public class OperatorSnapshotResult {
}
try {
- cancelIfNotNull(getOperatorStateRawFuture());
+ StateUtil.discardStateFuture(getOperatorStateRawFuture());
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(
new Exception("Could not properly cancel raw operator state future.", e),
@@ -122,15 +121,4 @@ public class OperatorSnapshotResult {
throw exception;
}
}
-
- private static <T extends StreamStateHandle> void cancelIfNotNull(RunnableFuture<T> future) throws Exception {
- if (null != future) {
- if (!future.cancel(true)) {
- // the cancellation was not successful because it might have been completed before
- StreamStateHandle streamStateHandle = FutureUtil.runIfNotDoneAndGet(future);
-
- streamStateHandle.discardState();
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/840b779c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 775475d..95f9d17 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StateBackendFactory;
+import org.apache.flink.runtime.state.StateUtil;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TaskStateHandles;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
@@ -530,7 +531,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
// propagate exceptions only if the task is still in "running" state
if (isRunning) {
throw new Exception("Could not perform checkpoint " + checkpointMetaData.getCheckpointId() +
- "for operator " + getName() + '.', e);
+ " for operator " + getName() + '.', e);
} else {
LOG.debug("Could not perform checkpoint {} for operator {} while the " +
"invokable was not in state running.", checkpointMetaData.getCheckpointId(), getName(), e);
@@ -953,12 +954,19 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
owner.getName(), checkpointMetaData.getCheckpointId(), asyncDurationMillis);
}
} catch (Exception e) {
+ try {
+ cleanup();
+ } catch (Exception cleanupException) {
+ e.addSuppressed(cleanupException);
+ }
+
// registers the exception and tries to fail the whole task
AsynchronousException asyncException = new AsynchronousException(
new Exception(
"Could not materialize checkpoint " + checkpointMetaData.getCheckpointId() +
" for operator " + owner.getName() + '.',
e));
+
owner.handleAsyncException("Failure in asynchronous checkpoint materialization", asyncException);
} finally {
owner.cancelables.unregisterClosable(this);
@@ -967,17 +975,37 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
@Override
public void close() {
- // cleanup/release ongoing snapshot operations
- for (OperatorSnapshotResult snapshotResult : snapshotInProgressList) {
- if (null != snapshotResult) {
+ try {
+ cleanup();
+ } catch (Exception cleanupException) {
+ LOG.warn("Could not properly clean up the async checkpoint runnable.", cleanupException);
+ }
+ }
+
+ private void cleanup() throws Exception {
+ Exception exception = null;
+
+ // clean up ongoing operator snapshot results and non partitioned state handles
+ for (OperatorSnapshotResult operatorSnapshotResult : snapshotInProgressList) {
+ if (operatorSnapshotResult != null) {
try {
- snapshotResult.cancel();
- } catch (Exception e) {
- LOG.warn("Could not properly cancel operator snapshot result in async " +
- "checkpoint runnable.", e);
+ operatorSnapshotResult.cancel();
+ } catch (Exception cancelException) {
+ exception = ExceptionUtils.firstOrSuppressed(cancelException, exception);
}
}
}
+
+ // discard non partitioned state handles
+ try {
+ StateUtil.bestEffortDiscardAllStateObjects(nonPartitionedStateHandles);
+ } catch (Exception discardException) {
+ exception = ExceptionUtils.firstOrSuppressed(discardException, exception);
+ }
+
+ if (null != exception) {
+ throw exception;
+ }
}
}
@@ -1057,6 +1085,18 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
}
}
+ // Cleanup non partitioned state handles
+ for (StreamStateHandle nonPartitionedState : nonPartitionedStates) {
+ if (nonPartitionedState != null) {
+ try {
+ nonPartitionedState.discardState();
+ } catch (Exception e) {
+ LOG.warn("Could not properly discard a non partitioned " +
+ "state. This might leave some orphaned files behind.", e);
+ }
+ }
+ }
+
if (LOG.isDebugEnabled()) {
LOG.debug("{} - did NOT finish synchronous part of checkpoint {}." +
"Alignment duration: {} ms, snapshot duration {} ms",
http://git-wip-us.apache.org/repos/asf/flink/blob/840b779c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index b55c288..ffdb09d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -20,15 +20,19 @@ package org.apache.flink.streaming.runtime.tasks;
import akka.dispatch.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.execution.Environment;
@@ -51,9 +55,12 @@ import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StateBackendFactory;
+import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TaskStateHandles;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.Task;
@@ -65,16 +72,20 @@ import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OperatorSnapshotResult;
import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
import org.junit.Test;
import org.mockito.Mockito;
+import org.mockito.internal.util.reflection.Whitebox;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import scala.concurrent.Await;
@@ -90,7 +101,9 @@ import java.net.URL;
import java.util.Collections;
import java.util.Comparator;
import java.util.PriorityQueue;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
+import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
@@ -98,10 +111,14 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.withSettings;
-public class StreamTaskTest {
+public class StreamTaskTest extends TestLogger {
private static OneShotLatch SYNC_LATCH;
@@ -170,8 +187,8 @@ public class StreamTaskTest {
task.getExecutingThread().join();
// ensure that the state backends are closed
- Mockito.verify(StateBackendTestSource.operatorStateBackend).close();
- Mockito.verify(StateBackendTestSource.keyedStateBackend).close();
+ verify(StateBackendTestSource.operatorStateBackend).close();
+ verify(StateBackendTestSource.keyedStateBackend).close();
assertEquals(ExecutionState.FINISHED, task.getExecutionState());
}
@@ -194,8 +211,8 @@ public class StreamTaskTest {
task.getExecutingThread().join();
// ensure that the state backends are closed
- Mockito.verify(StateBackendTestSource.operatorStateBackend).close();
- Mockito.verify(StateBackendTestSource.keyedStateBackend).close();
+ verify(StateBackendTestSource.operatorStateBackend).close();
+ verify(StateBackendTestSource.keyedStateBackend).close();
assertEquals(ExecutionState.FAILED, task.getExecutionState());
}
@@ -240,6 +257,161 @@ public class StreamTaskTest {
assertEquals(ExecutionState.CANCELED, task.getExecutionState());
}
+ @Test
+ public void testFailingCheckpointStreamOperator() throws Exception {
+ final long checkpointId = 42L;
+ final long timestamp = 1L;
+
+ TaskInfo mockTaskInfo = mock(TaskInfo.class);
+ when(mockTaskInfo.getTaskNameWithSubtasks()).thenReturn("foobar");
+ when(mockTaskInfo.getIndexOfThisSubtask()).thenReturn(0);
+ Environment mockEnvironment = mock(Environment.class);
+ when(mockEnvironment.getTaskInfo()).thenReturn(mockTaskInfo);
+
+ StreamTask<?, AbstractStreamOperator<?>> streamTask = mock(StreamTask.class, Mockito.CALLS_REAL_METHODS);
+ CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp);
+ streamTask.setEnvironment(mockEnvironment);
+
+ StreamOperator<?> streamOperator1 = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class));
+ StreamOperator<?> streamOperator2 = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class));
+ StreamOperator<?> streamOperator3 = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class));
+
+ OperatorSnapshotResult operatorSnapshotResult1 = mock(OperatorSnapshotResult.class);
+ OperatorSnapshotResult operatorSnapshotResult2 = mock(OperatorSnapshotResult.class);
+
+ final Exception testException = new Exception("Test exception");
+
+ when(streamOperator1.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult1);
+ when(streamOperator2.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult2);
+ when(streamOperator3.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenThrow(testException);
+
+ StreamOperator<?>[] streamOperators = {streamOperator1, streamOperator2, streamOperator3};
+
+ OperatorChain<Void, AbstractStreamOperator<Void>> operatorChain = mock(OperatorChain.class);
+ when(operatorChain.getAllOperators()).thenReturn(streamOperators);
+
+ StreamStateHandle streamStateHandle1 = mock(StreamStateHandle.class);
+ StreamStateHandle streamStateHandle2 = mock(StreamStateHandle.class);
+ StreamStateHandle streamStateHandle3 = mock(StreamStateHandle.class);
+
+ CheckpointStreamFactory.CheckpointStateOutputStream outStream1 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
+ CheckpointStreamFactory.CheckpointStateOutputStream outStream2 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
+ CheckpointStreamFactory.CheckpointStateOutputStream outStream3 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
+
+ when(outStream1.closeAndGetHandle()).thenReturn(streamStateHandle1);
+ when(outStream2.closeAndGetHandle()).thenReturn(streamStateHandle2);
+ when(outStream3.closeAndGetHandle()).thenReturn(streamStateHandle3);
+
+ CheckpointStreamFactory mockStreamFactory = mock(CheckpointStreamFactory.class);
+ when(mockStreamFactory.createCheckpointStateOutputStream(anyLong(), anyLong())).thenReturn(
+ outStream1, outStream2, outStream3);
+
+ AbstractStateBackend mockStateBackend = mock(AbstractStateBackend.class);
+ when(mockStateBackend.createStreamFactory(any(JobID.class), anyString())).thenReturn(mockStreamFactory);
+
+ Whitebox.setInternalState(streamTask, "isRunning", true);
+ Whitebox.setInternalState(streamTask, "lock", new Object());
+ Whitebox.setInternalState(streamTask, "operatorChain", operatorChain);
+ Whitebox.setInternalState(streamTask, "cancelables", new CloseableRegistry());
+ Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration()));
+ Whitebox.setInternalState(streamTask, "stateBackend", mockStateBackend);
+
+ try {
+ streamTask.triggerCheckpoint(checkpointMetaData);
+ fail("Expected test exception here.");
+ } catch (Exception e) {
+ assertEquals(testException, e.getCause());
+ }
+
+ verify(operatorSnapshotResult1).cancel();
+ verify(operatorSnapshotResult2).cancel();
+
+ verify(streamStateHandle1).discardState();
+ verify(streamStateHandle2).discardState();
+ verify(streamStateHandle3).discardState();
+ }
+
+ /**
+ * Tests that in case of a failing AsyncCheckpointRunnable all operator snapshot results are
+ * cancelled and all non partitioned state handles are discarded.
+ */
+ @Test
+ public void testFailingAsyncCheckpointRunnable() throws Exception {
+ final long checkpointId = 42L;
+ final long timestamp = 1L;
+
+ TaskInfo mockTaskInfo = mock(TaskInfo.class);
+ when(mockTaskInfo.getTaskNameWithSubtasks()).thenReturn("foobar");
+ when(mockTaskInfo.getIndexOfThisSubtask()).thenReturn(0);
+ Environment mockEnvironment = mock(Environment.class);
+ when(mockEnvironment.getTaskInfo()).thenReturn(mockTaskInfo);
+
+ StreamTask<?, AbstractStreamOperator<?>> streamTask = mock(StreamTask.class, Mockito.CALLS_REAL_METHODS);
+ CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp);
+ streamTask.setEnvironment(mockEnvironment);
+
+ StreamOperator<?> streamOperator1 = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class));
+ StreamOperator<?> streamOperator2 = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class));
+ StreamOperator<?> streamOperator3 = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class));
+
+ OperatorSnapshotResult operatorSnapshotResult1 = mock(OperatorSnapshotResult.class);
+ OperatorSnapshotResult operatorSnapshotResult2 = mock(OperatorSnapshotResult.class);
+ OperatorSnapshotResult operatorSnapshotResult3 = mock(OperatorSnapshotResult.class);
+
+ RunnableFuture<OperatorStateHandle> failingFuture = mock(RunnableFuture.class);
+ when(failingFuture.get()).thenThrow(new ExecutionException(new Exception("Test exception")));
+
+ when(operatorSnapshotResult3.getOperatorStateRawFuture()).thenReturn(failingFuture);
+
+ when(streamOperator1.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult1);
+ when(streamOperator2.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult2);
+ when(streamOperator3.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult3);
+
+ StreamOperator<?>[] streamOperators = {streamOperator1, streamOperator2, streamOperator3};
+
+ OperatorChain<Void, AbstractStreamOperator<Void>> operatorChain = mock(OperatorChain.class);
+ when(operatorChain.getAllOperators()).thenReturn(streamOperators);
+
+ StreamStateHandle streamStateHandle1 = mock(StreamStateHandle.class);
+ StreamStateHandle streamStateHandle2 = mock(StreamStateHandle.class);
+ StreamStateHandle streamStateHandle3 = mock(StreamStateHandle.class);
+
+ CheckpointStreamFactory.CheckpointStateOutputStream outStream1 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
+ CheckpointStreamFactory.CheckpointStateOutputStream outStream2 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
+ CheckpointStreamFactory.CheckpointStateOutputStream outStream3 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
+
+ when(outStream1.closeAndGetHandle()).thenReturn(streamStateHandle1);
+ when(outStream2.closeAndGetHandle()).thenReturn(streamStateHandle2);
+ when(outStream3.closeAndGetHandle()).thenReturn(streamStateHandle3);
+
+ CheckpointStreamFactory mockStreamFactory = mock(CheckpointStreamFactory.class);
+ when(mockStreamFactory.createCheckpointStateOutputStream(anyLong(), anyLong())).thenReturn(
+ outStream1, outStream2, outStream3);
+
+ AbstractStateBackend mockStateBackend = mock(AbstractStateBackend.class);
+ when(mockStateBackend.createStreamFactory(any(JobID.class), anyString())).thenReturn(mockStreamFactory);
+
+ Whitebox.setInternalState(streamTask, "isRunning", true);
+ Whitebox.setInternalState(streamTask, "lock", new Object());
+ Whitebox.setInternalState(streamTask, "operatorChain", operatorChain);
+ Whitebox.setInternalState(streamTask, "cancelables", new CloseableRegistry());
+ Whitebox.setInternalState(streamTask, "asyncOperationsThreadPool", MoreExecutors.newDirectExecutorService());
+ Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration()));
+ Whitebox.setInternalState(streamTask, "stateBackend", mockStateBackend);
+
+ streamTask.triggerCheckpoint(checkpointMetaData);
+
+ verify(streamTask).handleAsyncException(anyString(), any(Throwable.class));
+
+ verify(operatorSnapshotResult1).cancel();
+ verify(operatorSnapshotResult2).cancel();
+ verify(operatorSnapshotResult3).cancel();
+
+ verify(streamStateHandle1).discardState();
+ verify(streamStateHandle2).discardState();
+ verify(streamStateHandle3).discardState();
+ }
+
// ------------------------------------------------------------------------
// Test Utilities
// ------------------------------------------------------------------------