You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/01 14:51:34 UTC
[2/2] flink git commit: [FLINK-5218] [state backends] Eagerly close
pending FsCheckpointStateOutputStream on task cancellation
[FLINK-5218] [state backends] Eagerly close pending FsCheckpointStateOutputStream on task cancellation
This fix contains two modifications:
1. State backends implement 'Closeable' and register themselves at the 'canceleables'
2. The FsStateBackend tracks all it unclosed FsCheckpointOutputStreams and closes them on 'close()'
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/59f61bf6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/59f61bf6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/59f61bf6
Branch: refs/heads/release-1.1
Commit: 59f61bf6cb8351cec9369e2de39c6eeffbda10ea
Parents: e475eb2
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Nov 30 22:38:23 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Dec 1 15:09:43 2016 +0100
----------------------------------------------------------------------
.../streaming/state/RocksDBStateBackend.java | 86 ++++--
.../runtime/state/AbstractStateBackend.java | 58 +++-
.../state/filesystem/FsStateBackend.java | 159 ++++++++--
.../state/memory/MemoryStateBackend.java | 2 +-
.../FsCheckpointStateOutputStreamTest.java | 132 ++++++++-
.../state/FsStateBackendClosingTest.java | 65 +++++
.../streaming/runtime/tasks/StreamTask.java | 7 +
.../runtime/tasks/BlockingCheckpointsTest.java | 290 +++++++++++++++++++
.../streaming/runtime/StateBackendITCase.java | 2 +-
9 files changed, 731 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/59f61bf6/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index deba9f9..0412a4a 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -312,50 +312,74 @@ public class RocksDBStateBackend extends AbstractStateBackend {
}
@Override
- public void dispose() {
- super.dispose();
- nonPartitionedStateBackend.dispose();
+ public void dispose() throws Exception {
+ Throwable exception = null;
- // we have to lock because we might have an asynchronous checkpoint going on
- synchronized (dbCleanupLock) {
- if (db != null) {
- if (this.dbOptions != null) {
- this.dbOptions.dispose();
- this.dbOptions = null;
- }
-
- for (Tuple2<ColumnFamilyHandle, StateDescriptor> column : kvStateInformation.values()) {
- column.f0.dispose();
- }
+ // make sure the individual states are disposed
+ try {
+ super.dispose();
+ }
+ catch (Throwable t) {
+ exception = t;
+ }
- db.dispose();
- db = null;
+ try {
+ nonPartitionedStateBackend.dispose();
+ }
+ catch (Throwable t) {
+ if (exception == null) {
+ exception = t;
+ } else {
+ exception.addSuppressed(t);
}
}
- }
-
- @Override
- public void close() throws Exception {
- nonPartitionedStateBackend.close();
// we have to lock because we might have an asynchronous checkpoint going on
- synchronized (dbCleanupLock) {
- if (db != null) {
- if (this.dbOptions != null) {
- this.dbOptions.dispose();
- this.dbOptions = null;
- }
+ // this must also happen in any case, regardless of earlier exceptions
+ try {
+ synchronized (dbCleanupLock) {
+ if (db != null) {
+ if (this.dbOptions != null) {
+ this.dbOptions.dispose();
+ this.dbOptions = null;
+ }
+
+ for (Tuple2<ColumnFamilyHandle, StateDescriptor> column : kvStateInformation.values()) {
+ column.f0.dispose();
+ }
- for (Tuple2<ColumnFamilyHandle, StateDescriptor> column : kvStateInformation.values()) {
- column.f0.dispose();
+ db.dispose();
+ db = null;
}
+ }
+ }
+ catch (Throwable t) {
+ if (exception == null) {
+ exception = t;
+ } else {
+ exception.addSuppressed(t);
+ }
+ }
- db.dispose();
- db = null;
+ if (exception != null) {
+ if (exception instanceof Exception) {
+ throw (Exception) exception;
+ } else if (exception instanceof Error) {
+ throw (Error) exception;
+ } else {
+ throw new Exception(exception.getMessage(), exception);
}
}
}
+ @Override
+ public void close() throws IOException {
+ // we only close all I/O streams here and do not yet dispose of the native resources
+ // otherwise this can lead to SEGFAULT problems
+ // native resources must only be released in the 'dispose()' method.
+ nonPartitionedStateBackend.close();
+ }
+
private File getDbPath(String stateName) {
return new File(new File(new File(getNextStoragePath(), jobId.toString()), operatorIdentifier), stateName);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/59f61bf6/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
index b86688b..ab9854c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
@@ -39,6 +39,7 @@ import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.execution.Environment;
+import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
@@ -51,8 +52,8 @@ import java.util.Map;
/**
* A state backend defines how state is stored and snapshotted during checkpoints.
*/
-public abstract class AbstractStateBackend implements java.io.Serializable {
-
+public abstract class AbstractStateBackend implements java.io.Serializable, Closeable {
+
private static final long serialVersionUID = 4620413814639220247L;
protected transient TypeSerializer<?> keySerializer;
@@ -102,23 +103,61 @@ public abstract class AbstractStateBackend implements java.io.Serializable {
public abstract void disposeAllStateForCurrentJob() throws Exception;
/**
- * Closes the state backend, releasing all internal resources, but does not delete any persistent
- * checkpoint data.
+ * Closes the state backend, dropping and aborting all I/O operations that are currently
+ * pending.
*
- * @throws Exception Exceptions can be forwarded and will be logged by the system
+ * @throws IOException Exceptions can be forwarded and will be logged by the system
*/
- public abstract void close() throws Exception;
+ public abstract void close() throws IOException;
+
+ /**
+ * Releases all resources held by this state backend.
+ *
+ * <p>This method must make sure that all resources are disposed, even if an exception happens
+ * on the way.
+ *
+ * @throws Exception This method should report exceptions that occur.
+ */
+ public void dispose() throws Exception {
+ Throwable exception = null;
+
+ // make sure things are closed
+ try {
+ close();
+ }
+ catch (Throwable t) {
+ exception = t;
+ }
- public void dispose() {
+ // now actually dispose things
lastName = null;
lastState = null;
if (keyValueStates != null) {
for (KvState<?, ?, ?, ?, ?> state : keyValueStates) {
- state.dispose();
+ try {
+ state.dispose();
+ }
+ catch (Throwable t) {
+ if (exception == null) {
+ exception = t;
+ } else {
+ exception.addSuppressed(t);
+ }
+ }
}
}
keyValueStates = null;
keyValueStatesByName = null;
+
+ if (exception != null) {
+ if (exception instanceof Exception) {
+ throw (Exception) exception;
+ } else if (exception instanceof Error) {
+ throw (Error) exception;
+ } else {
+ throw new Exception(exception.getMessage(), exception);
+ }
+ }
}
// ------------------------------------------------------------------------
@@ -444,6 +483,9 @@ public abstract class AbstractStateBackend implements java.io.Serializable {
* @throws IOException Thrown, if the stream cannot be closed.
*/
public StateHandle<DataInputView> closeAndGetHandle() throws IOException {
+ // we do not flush() here, because that forces the 'CheckpointStateOutputStream' to files,
+ // even when it could stay in a 'small chunk' memory handle.
+ // the 'DataOutputViewStreamWrapper' does not buffer data anyways
return new DataInputViewHandle(out.closeAndGetHandle());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/59f61bf6/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index 8a8a26d..446f3ea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -33,9 +33,10 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.state.AbstractStateBackend;
-
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.apache.flink.util.ExceptionUtils;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,9 +45,13 @@ import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashSet;
import java.util.UUID;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* The file state backend is a state backend that stores the state of streaming jobs in a file system.
*
@@ -86,6 +91,10 @@ public class FsStateBackend extends AbstractStateBackend {
/** Cached handle to the file system for file operations */
private transient FileSystem filesystem;
+ /** Set of currently open streams */
+ private transient HashSet<FsCheckpointStateOutputStream> openStreams;
+
+ private transient volatile boolean closed;
/**
* Creates a new state backend that stores its checkpoint data in the file system and location
@@ -236,9 +245,11 @@ public class FsStateBackend extends AbstractStateBackend {
// ------------------------------------------------------------------------
@Override
- public void initializeForJob(Environment env,
- String operatorIdentifier,
- TypeSerializer<?> keySerializer) throws Exception {
+ public void initializeForJob(
+ Environment env,
+ String operatorIdentifier,
+ TypeSerializer<?> keySerializer) throws Exception {
+
super.initializeForJob(env, operatorIdentifier, keySerializer);
Path dir = new Path(basePath, env.getJobID().toString());
@@ -249,6 +260,7 @@ public class FsStateBackend extends AbstractStateBackend {
filesystem.mkdirs(dir);
checkpointDirectory = dir;
+ openStreams = new HashSet<>();
}
@Override
@@ -267,7 +279,42 @@ public class FsStateBackend extends AbstractStateBackend {
}
@Override
- public void close() throws Exception {}
+ public void close() throws IOException {
+ closed = true;
+
+ // cache a copy on the heap for safety
+ final HashSet<FsCheckpointStateOutputStream> openStreams = this.openStreams;
+ if (openStreams != null) {
+
+ // we need to draw a copy of the set, since the closing concurrently modifies the set
+ final ArrayList<FsCheckpointStateOutputStream> streams;
+
+ //noinspection SynchronizationOnLocalVariableOrMethodParameter
+ synchronized (openStreams) {
+ streams = new ArrayList<>(openStreams);
+ openStreams.clear();
+ }
+
+ // close all the streams, collect exceptions and record all but the first as suppressed
+ Throwable exception = null;
+ for (FsCheckpointStateOutputStream stream : streams) {
+ try {
+ stream.close();
+ }
+ catch (Throwable t) {
+ if (exception == null) {
+ exception = t;
+ } else {
+ exception.addSuppressed(t);
+ }
+ }
+ }
+
+ if (exception != null) {
+ ExceptionUtils.rethrowIOException(exception);
+ }
+ }
+ }
// ------------------------------------------------------------------------
// state backend operations
@@ -299,15 +346,22 @@ public class FsStateBackend extends AbstractStateBackend {
S state, long checkpointID, long timestamp) throws Exception
{
checkFileSystemInitialized();
-
+
Path checkpointDir = createCheckpointDirPath(checkpointID);
int bufferSize = Math.max(DEFAULT_WRITE_BUFFER_SIZE, fileStateThreshold);
- FsCheckpointStateOutputStream stream =
- new FsCheckpointStateOutputStream(checkpointDir, filesystem, bufferSize, fileStateThreshold);
-
- try (ObjectOutputStream os = new ObjectOutputStream(stream)) {
+ try (FsCheckpointStateOutputStream stream = new FsCheckpointStateOutputStream(
+ checkpointDir, filesystem, openStreams, bufferSize, fileStateThreshold))
+ {
+ // perform the closing double-check AFTER! the creation of the stream
+ if (closed) {
+ throw new IOException("closed");
+ }
+
+ ObjectOutputStream os = new ObjectOutputStream(stream);
os.writeObject(state);
+ os.flush();
+
return stream.closeAndGetHandle().toSerializableHandle();
}
}
@@ -318,7 +372,16 @@ public class FsStateBackend extends AbstractStateBackend {
Path checkpointDir = createCheckpointDirPath(checkpointID);
int bufferSize = Math.max(DEFAULT_WRITE_BUFFER_SIZE, fileStateThreshold);
- return new FsCheckpointStateOutputStream(checkpointDir, filesystem, bufferSize, fileStateThreshold);
+ FsCheckpointStateOutputStream stream = new FsCheckpointStateOutputStream(
+ checkpointDir, filesystem, openStreams, bufferSize, fileStateThreshold);
+
+ // perform the closing double-check AFTER! the creation of the stream
+ if (closed) {
+ stream.close();
+ throw new IOException("closed");
+ }
+
+ return stream;
}
// ------------------------------------------------------------------------
@@ -426,29 +489,40 @@ public class FsStateBackend extends AbstractStateBackend {
private int pos;
private FSDataOutputStream outStream;
-
+
private final int localStateThreshold;
private final Path basePath;
private final FileSystem fs;
-
+
+ private final HashSet<FsCheckpointStateOutputStream> openStreams;
+
private Path statePath;
-
- private boolean closed;
+
+ private volatile boolean closed;
public FsCheckpointStateOutputStream(
- Path basePath, FileSystem fs,
- int bufferSize, int localStateThreshold)
+ Path basePath,
+ FileSystem fs,
+ HashSet<FsCheckpointStateOutputStream> openStreams,
+ int bufferSize,
+ int localStateThreshold)
{
if (bufferSize < localStateThreshold) {
throw new IllegalArgumentException();
}
-
+
this.basePath = basePath;
this.fs = fs;
+ this.openStreams = checkNotNull(openStreams);
this.writeBuffer = new byte[bufferSize];
this.localStateThreshold = localStateThreshold;
+
+ //noinspection SynchronizationOnLocalVariableOrMethodParameter
+ synchronized (openStreams) {
+ openStreams.add(this);
+ }
}
@@ -519,6 +593,9 @@ public class FsStateBackend extends AbstractStateBackend {
pos = 0;
}
}
+ else {
+ throw new IOException("stream is closed");
+ }
}
/**
@@ -530,6 +607,16 @@ public class FsStateBackend extends AbstractStateBackend {
public void close() {
if (!closed) {
closed = true;
+
+ // make sure that the write() methods cannot succeed any more
+ pos = writeBuffer.length;
+
+ // remove the stream from the open streams set
+ synchronized (openStreams) {
+ openStreams.remove(this);
+ }
+
+ // close all resources
if (outStream != null) {
try {
outStream.close();
@@ -551,15 +638,24 @@ public class FsStateBackend extends AbstractStateBackend {
public StreamStateHandle closeAndGetHandle() throws IOException {
synchronized (this) {
if (!closed) {
+
+ // remove the stream from the open streams set
+ synchronized (openStreams) {
+ openStreams.remove(this);
+ }
+
+ // close all resources
if (outStream == null && pos <= localStateThreshold) {
closed = true;
byte[] bytes = Arrays.copyOf(writeBuffer, pos);
+ pos = writeBuffer.length;
return new ByteStreamStateHandle(bytes);
}
else {
flush();
outStream.close();
closed = true;
+ pos = writeBuffer.length;
return new FileStreamStateHandle(statePath);
}
}
@@ -577,9 +673,17 @@ public class FsStateBackend extends AbstractStateBackend {
public Path closeAndGetPath() throws IOException {
synchronized (this) {
if (!closed) {
- closed = true;
+
+ // remove the stream from the open streams set
+ synchronized (openStreams) {
+ openStreams.remove(this);
+ }
+
+ // close all resources
flush();
outStream.close();
+ closed = true;
+ pos = writeBuffer.length;
return statePath;
}
else {
@@ -587,5 +691,22 @@ public class FsStateBackend extends AbstractStateBackend {
}
}
}
+
+ public boolean isClosed() {
+ return closed;
+ }
+
+ // we need referential identity on the streams for the closing set to work
+ // properly, so we implement that via final methods here
+
+ @Override
+ public final int hashCode() {
+ return super.hashCode();
+ }
+
+ @Override
+ public final boolean equals(Object obj) {
+ return this == obj;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/59f61bf6/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
index 7b9d21b..b155244 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
@@ -78,7 +78,7 @@ public class MemoryStateBackend extends AbstractStateBackend {
}
@Override
- public void close() throws Exception {}
+ public void close() {}
// ------------------------------------------------------------------------
// State backend operations
http://git-wip-us.apache.org/repos/asf/flink/blob/59f61bf6/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsCheckpointStateOutputStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsCheckpointStateOutputStreamTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsCheckpointStateOutputStreamTest.java
index 5964b72..3aba9e1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsCheckpointStateOutputStreamTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsCheckpointStateOutputStreamTest.java
@@ -22,34 +22,36 @@ import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.filesystem.FileStreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend.FsCheckpointStateOutputStream;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.junit.Test;
import java.io.File;
+import java.io.IOException;
import java.io.InputStream;
+import java.util.HashSet;
import java.util.Random;
import static org.junit.Assert.*;
public class FsCheckpointStateOutputStreamTest {
-
+
/** The temp dir, obtained in a platform neutral way */
private static final Path TEMP_DIR_PATH = new Path(new File(System.getProperty("java.io.tmpdir")).toURI());
-
-
+
+
@Test(expected = IllegalArgumentException.class)
public void testWrongParameters() {
// this should fail
new FsStateBackend.FsCheckpointStateOutputStream(
- TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), 4000, 5000);
+ TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), new HashSet<FsCheckpointStateOutputStream>(), 4000, 5000);
}
-
@Test
public void testEmptyState() throws Exception {
AbstractStateBackend.CheckpointStateOutputStream stream = new FsStateBackend.FsCheckpointStateOutputStream(
- TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), 1024, 512);
+ TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), new HashSet<FsCheckpointStateOutputStream>(), 1024, 512);
StreamStateHandle handle = stream.closeAndGetHandle();
assertTrue(handle instanceof ByteStreamStateHandle);
@@ -57,7 +59,101 @@ public class FsCheckpointStateOutputStreamTest {
InputStream inStream = handle.getState(ClassLoader.getSystemClassLoader());
assertEquals(-1, inStream.read());
}
-
+
+ @Test
+ public void testCloseAndGetPath() throws Exception {
+ FsCheckpointStateOutputStream stream = new FsCheckpointStateOutputStream(
+ TEMP_DIR_PATH,
+ FileSystem.getLocalFileSystem(),
+ new HashSet<FsCheckpointStateOutputStream>(),
+ 1024,
+ 512);
+
+ stream.write(1);
+
+ Path path = stream.closeAndGetPath();
+ assertNotNull(path);
+
+ // cleanup
+ FileSystem.getLocalFileSystem().delete(path, false);
+ }
+
+ @Test
+ public void testWriteFailsFastWhenClosed() throws Exception {
+ final HashSet<FsCheckpointStateOutputStream> openStreams = new HashSet<>();
+
+ FsCheckpointStateOutputStream stream1 = new FsCheckpointStateOutputStream(
+ TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), openStreams, 1024, 512);
+
+ FsCheckpointStateOutputStream stream2 = new FsCheckpointStateOutputStream(
+ TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), openStreams, 1024, 512);
+
+ FsCheckpointStateOutputStream stream3 = new FsCheckpointStateOutputStream(
+ TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), openStreams, 1024, 512);
+
+ assertFalse(stream1.isClosed());
+ assertFalse(stream2.isClosed());
+ assertFalse(stream3.isClosed());
+
+ // simple close
+ stream1.close();
+
+ // close with handle
+ StreamStateHandle handle = stream2.closeAndGetHandle();
+
+ // close with path
+ Path path = stream3.closeAndGetPath();
+
+ assertTrue(stream1.isClosed());
+ assertTrue(stream2.isClosed());
+ assertTrue(stream3.isClosed());
+
+ validateStreamNotWritable(stream1);
+ validateStreamNotWritable(stream2);
+ validateStreamNotWritable(stream3);
+
+ // clean up
+ handle.discardState();
+ FileSystem.getLocalFileSystem().delete(path, false);
+ }
+
+ @Test
+ public void testAddAndRemoveFromOpenStreamsSet() throws Exception {
+ final HashSet<FsCheckpointStateOutputStream> openStreams = new HashSet<>();
+
+ FsCheckpointStateOutputStream stream1 = new FsCheckpointStateOutputStream(
+ TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), openStreams, 1024, 512);
+
+ FsCheckpointStateOutputStream stream2 = new FsCheckpointStateOutputStream(
+ TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), openStreams, 1024, 512);
+
+ FsCheckpointStateOutputStream stream3 = new FsCheckpointStateOutputStream(
+ TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), openStreams, 1024, 512);
+
+ assertTrue(openStreams.contains(stream1));
+ assertTrue(openStreams.contains(stream2));
+ assertTrue(openStreams.contains(stream3));
+ assertEquals(3, openStreams.size());
+
+ // simple close
+ stream1.close();
+
+ // close with handle
+ StreamStateHandle handle = stream2.closeAndGetHandle();
+
+ // close with path
+ Path path = stream3.closeAndGetPath();
+
+ assertFalse(openStreams.contains(stream1));
+ assertFalse(openStreams.contains(stream2));
+ assertFalse(openStreams.contains(stream3));
+ assertEquals(0, openStreams.size());
+
+ // clean up
+ handle.discardState();
+ FileSystem.getLocalFileSystem().delete(path, false);
+ }
+
@Test
public void testStateBlowMemThreshold() throws Exception {
runTest(222, 999, 512, false);
@@ -72,16 +168,17 @@ public class FsCheckpointStateOutputStreamTest {
public void testStateAboveMemThreshold() throws Exception {
runTest(576446, 259, 17, true);
}
-
+
@Test
public void testZeroThreshold() throws Exception {
runTest(16678, 4096, 0, true);
}
-
+
private void runTest(int numBytes, int bufferSize, int threshold, boolean expectFile) throws Exception {
AbstractStateBackend.CheckpointStateOutputStream stream =
new FsStateBackend.FsCheckpointStateOutputStream(
- TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), bufferSize, threshold);
+ TEMP_DIR_PATH, FileSystem.getLocalFileSystem(),
+ new HashSet<FsCheckpointStateOutputStream>(), bufferSize, threshold);
Random rnd = new Random();
byte[] original = new byte[numBytes];
@@ -125,4 +222,19 @@ public class FsCheckpointStateOutputStreamTest {
handle.discardState();
}
+
+ private void validateStreamNotWritable(FsCheckpointStateOutputStream stream) {
+ try {
+ stream.write(1);
+ fail();
+ } catch (IOException e) {
+ // expected
+ }
+ try {
+ stream.write(new byte[4], 1, 2);
+ fail();
+ } catch (IOException e) {
+ // expected
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/59f61bf6/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsStateBackendClosingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsStateBackendClosingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsStateBackendClosingTest.java
new file mode 100644
index 0000000..6df488d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsStateBackendClosingTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend.FsCheckpointStateOutputStream;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class FsStateBackendClosingTest {
+
+ @Test
+ public void testStateBackendClosesStreams() throws Exception {
+ final URI tempFolder = new File(EnvironmentInformation.getTemporaryFileDirectory()).toURI();
+ final FsStateBackend backend = new FsStateBackend(tempFolder);
+
+ backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
+
+ FsCheckpointStateOutputStream stream = backend.createCheckpointStateOutputStream(17L, 12345L);
+
+ // stream is open, this should succeed
+ assertFalse(stream.isClosed());
+ stream.write(1);
+
+ // close the backend - that should close the stream
+ backend.close();
+
+ assertTrue(stream.isClosed());
+
+ try {
+ stream.write(2);
+ fail("stream is closed, 'write(int)' should fail with an exception");
+ }
+ catch (IOException e) {
+ // expected
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/59f61bf6/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index aaaead0..99df060 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -772,7 +772,14 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
}
}
}
+
stateBackend.initializeForJob(getEnvironment(), operatorIdentifier, keySerializer);
+
+ // make sure the state backend is closed eagerly in case of cancellation
+ synchronized (cancelables) {
+ cancelables.add(stateBackend);
+ }
+
return stateBackend;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/59f61bf6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
new file mode 100644
index 0000000..ed993c7
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.librarycache.FallbackLibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.AbstractStateBackend.CheckpointStateOutputStream;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.StreamFilter;
+import org.apache.flink.util.SerializedValue;
+
+import org.junit.Test;
+
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URL;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+
+/**
+ * This test checks that task checkpoints that block and do not react to thread interrupts
+ * are
+ */
+public class BlockingCheckpointsTest {
+
+ private static final OneShotLatch IN_CHECKPOINT_LATCH = new OneShotLatch();
+
+ @Test
+ public void testBlockingNonInterruptibleCheckpoint() throws Exception {
+
+ Configuration taskConfig = new Configuration();
+ StreamConfig cfg = new StreamConfig(taskConfig);
+ cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+ cfg.setStreamOperator(new TestOperator());
+ cfg.setStateBackend(new LockingStreamStateBackend());
+
+ Task task = createTask(taskConfig);
+
+ // start the task and wait until it is in "restore"
+ task.startTaskThread();
+ IN_CHECKPOINT_LATCH.await();
+
+ // cancel the task and wait. unless cancellation properly closes
+ // the streams, this will never terminate
+ task.cancelExecution();
+ task.getExecutingThread().join();
+
+ assertEquals(ExecutionState.CANCELED, task.getExecutionState());
+ assertNull(task.getFailureCause());
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ private static Task createTask(Configuration taskConfig) throws IOException {
+
+ JobInformation jobInformation = new JobInformation(
+ new JobID(),
+ "test job name",
+ new SerializedValue<>(new ExecutionConfig()),
+ new Configuration(),
+ Collections.<BlobKey>emptyList(),
+ Collections.<URL>emptyList());
+
+ TaskInformation taskInformation = new TaskInformation(
+ new JobVertexID(),
+ "test task name",
+ 1,
+ TestStreamTask.class.getName(),
+ taskConfig);
+
+ return new Task(
+ jobInformation,
+ taskInformation,
+ new ExecutionAttemptID(),
+ 0,
+ 0,
+ Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+ Collections.<InputGateDeploymentDescriptor>emptyList(),
+ 0,
+ null,
+ mock(MemoryManager.class),
+ mock(IOManager.class),
+ mock(NetworkEnvironment.class),
+ mock(BroadcastVariableManager.class),
+ mock(ActorGateway.class),
+ mock(ActorGateway.class),
+ new FiniteDuration(10, TimeUnit.SECONDS),
+ new FallbackLibraryCacheManager(),
+ new FileCache(new Configuration()),
+ new TaskManagerRuntimeInfo(
+ "localhost", new Configuration(), EnvironmentInformation.getTemporaryFileDirectory()),
+ new UnregisteredTaskMetricsGroup());
+
+ }
+
+ // ------------------------------------------------------------------------
+ // state backend with locking output stream
+ // ------------------------------------------------------------------------
+
+ private static class LockingStreamStateBackend extends AbstractStateBackend {
+
+ private static final long serialVersionUID = 1L;
+
+ private final LockingOutputStream out = new LockingOutputStream();
+
+ @Override
+ public void disposeAllStateForCurrentJob() {}
+
+ @Override
+ public void close() throws IOException {
+ out.close();
+ }
+
+ @Override
+ public CheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception {
+ return out;
+ }
+
+ @Override
+ protected <N, T> ValueState<T> createValueState(TypeSerializer<N> namespaceSerializer, ValueStateDescriptor<T> stateDesc) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected <N, T> ListState<T> createListState(TypeSerializer<N> namespaceSerializer, ListStateDescriptor<T> stateDesc) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected <N, T> ReducingState<T> createReducingState(TypeSerializer<N> namespaceSerializer, ReducingStateDescriptor<T> stateDesc) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected <N, T, ACC> FoldingState<T, ACC> createFoldingState(TypeSerializer<N> namespaceSerializer, FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <S extends Serializable> StateHandle<S> checkpointStateSerializable(S state, long checkpointID, long timestamp) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private static final class LockingOutputStream extends CheckpointStateOutputStream implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final SerializableObject lock = new SerializableObject();
+ private volatile boolean closed;
+
+ @Override
+ public StreamStateHandle closeAndGetHandle() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ // this needs to not react to interrupts until the handle is closed
+ synchronized (lock) {
+ while (!closed) {
+ try {
+ lock.wait();
+ }
+ catch (InterruptedException ignored) {}
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ synchronized (lock) {
+ closed = true;
+ lock.notifyAll();
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // test source operator that calls into the locking checkpoint output stream
+ // ------------------------------------------------------------------------
+
+ @SuppressWarnings("serial")
+ private static final class TestOperator extends StreamFilter<Object> {
+ private static final long serialVersionUID = 1L;
+
+ public TestOperator() {
+ super(new FilterFunction<Object>() {
+ @Override
+ public boolean filter(Object value) {
+ return false;
+ }
+ });
+ }
+
+ @Override
+ public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
+ AbstractStateBackend stateBackend = getStateBackend();
+ CheckpointStateOutputStream outStream = stateBackend.createCheckpointStateOutputStream(checkpointId, timestamp);
+
+ IN_CHECKPOINT_LATCH.trigger();
+
+ // this should lock
+ outStream.write(1);
+
+ // this should be unreachable
+ return null;
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // stream task that simply triggers a checkpoint
+ // ------------------------------------------------------------------------
+
+ public static final class TestStreamTask extends OneInputStreamTask<Object, Object> {
+
+ @Override
+ public void init() {}
+
+ @Override
+ protected void run() throws Exception {
+ triggerCheckpointOnBarrier(11L, System.currentTimeMillis());
+ }
+
+ @Override
+ protected void cleanup() {}
+
+ @Override
+ protected void cancelTask() {}
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/59f61bf6/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
index 6288946..4eb8b4a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
@@ -104,7 +104,7 @@ public class StateBackendITCase extends StreamingMultipleProgramsTestBase {
public void disposeAllStateForCurrentJob() throws Exception {}
@Override
- public void close() throws Exception {}
+ public void close() {}
@Override
protected <N, T> ValueState<T> createValueState(TypeSerializer<N> namespaceSerializer, ValueStateDescriptor<T> stateDesc) throws Exception {