You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2017/09/25 15:47:51 UTC
[2/2] flink git commit: [FLINK-7619] Improved abstraction of
AbstractAsyncIOCallable to better fit the current usage pattern.
[FLINK-7619] Improved abstraction of AbstractAsyncIOCallable to better fit the current usage pattern.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5af463a9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5af463a9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5af463a9
Branch: refs/heads/master
Commit: 5af463a9c0ff62603bc342a78dfd5483d834e8a7
Parents: 0073204
Author: Stefan Richter <s....@data-artisans.com>
Authored: Thu Sep 7 11:24:12 2017 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Mon Sep 25 16:04:15 2017 +0200
----------------------------------------------------------------------
.../state/RocksDBKeyedStateBackend.java | 198 +++++++++++--------
.../apache/flink/core/fs/CloseableRegistry.java | 4 +-
.../flink/core/fs/ClosingFSDataInputStream.java | 4 +-
.../core/fs/ClosingFSDataOutputStream.java | 4 +-
.../core/fs/SafetyNetCloseableRegistry.java | 8 +-
.../flink/util/AbstractCloseableRegistry.java | 11 +-
.../core/fs/AbstractCloseableRegistryTest.java | 11 +-
.../flink/core/fs/CloseableRegistryTest.java | 2 +-
.../AbstractAsyncSnapshotIOCallable.java | 109 ----------
.../AbstractAsyncCallableWithResources.java | 194 ++++++++++++++++++
.../io/async/AbstractAsyncIOCallable.java | 157 ---------------
.../flink/runtime/io/async/AsyncStoppable.java | 4 +-
.../state/DefaultOperatorStateBackend.java | 66 +++++--
.../state/StateInitializationContextImpl.java | 17 +-
.../StateSnapshotContextSynchronousImpl.java | 25 +--
.../state/heap/HeapKeyedStateBackend.java | 85 +++++---
.../streaming/runtime/tasks/StreamTask.java | 8 +-
.../streaming/runtime/tasks/StreamTaskTest.java | 2 +-
.../util/AbstractStreamOperatorTestHarness.java | 2 +-
...tractEventTimeWindowCheckpointingITCase.java | 3 +-
20 files changed, 468 insertions(+), 446 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index dd5b852..a1500c7 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -43,7 +43,7 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import org.apache.flink.runtime.io.async.AbstractAsyncIOCallable;
+import org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources;
import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
@@ -384,8 +384,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
final CheckpointStreamFactory streamFactory) throws Exception {
long startTime = System.currentTimeMillis();
+ final CloseableRegistry snapshotCloseableRegistry = new CloseableRegistry();
+
+ final RocksDBFullSnapshotOperation<K> snapshotOperation;
- final RocksDBFullSnapshotOperation<K> snapshotOperation = new RocksDBFullSnapshotOperation<>(this, streamFactory);
// hold the db lock while operation on the db to guard us against async db disposal
synchronized (asyncSnapshotLock) {
@@ -399,6 +401,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
return DoneFuture.nullValue();
}
+ snapshotOperation =
+ new RocksDBFullSnapshotOperation<>(this, streamFactory, snapshotCloseableRegistry);
+
snapshotOperation.takeDBSnapShot(checkpointId, timestamp);
} else {
throw new IOException("RocksDB closed.");
@@ -406,30 +411,55 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
// implementation of the async IO operation, based on FutureTask
- AbstractAsyncIOCallable<KeyedStateHandle, CheckpointStreamFactory.CheckpointStateOutputStream> ioCallable =
- new AbstractAsyncIOCallable<KeyedStateHandle, CheckpointStreamFactory.CheckpointStateOutputStream>() {
+ AbstractAsyncCallableWithResources<KeyedStateHandle> ioCallable =
+ new AbstractAsyncCallableWithResources<KeyedStateHandle>() {
@Override
- public CheckpointStreamFactory.CheckpointStateOutputStream openIOHandle() throws Exception {
+ protected void acquireResources() throws Exception {
+ cancelStreamRegistry.registerCloseable(snapshotCloseableRegistry);
snapshotOperation.openCheckpointStream();
- return snapshotOperation.getOutStream();
}
@Override
- public KeyGroupsStateHandle performOperation() throws Exception {
- long startTime = System.currentTimeMillis();
+ protected void releaseResources() throws Exception {
+ closeLocalRegistry();
+ releaseSnapshotOperationResources();
+ }
+
+ private void releaseSnapshotOperationResources() {
+ // hold the db lock while operation on the db to guard us against async db disposal
synchronized (asyncSnapshotLock) {
+ snapshotOperation.releaseSnapshotResources();
+ }
+ }
+
+ @Override
+ protected void stopOperation() throws Exception {
+ closeLocalRegistry();
+ }
+
+ private void closeLocalRegistry() {
+ if (cancelStreamRegistry.unregisterCloseable(snapshotCloseableRegistry)) {
try {
- // hold the db lock while operation on the db to guard us against async db disposal
- if (db == null) {
- throw new IOException("RocksDB closed.");
- }
+ snapshotCloseableRegistry.close();
+ } catch (Exception ex) {
+ LOG.warn("Error closing local registry", ex);
+ }
+ }
+ }
- snapshotOperation.writeDBSnapshot();
+ @Override
+ public KeyGroupsStateHandle performOperation() throws Exception {
+ long startTime = System.currentTimeMillis();
- } finally {
- snapshotOperation.closeCheckpointStream();
+ synchronized (asyncSnapshotLock) {
+ // hold the db lock while operation on the db to guard us against async db disposal
+ if (db == null) {
+ throw new IOException("RocksDB closed.");
}
+
+ snapshotOperation.writeDBSnapshot();
+ snapshotOperation.createSnapshotResultStateHandleFromOutputStream();
}
LOG.info("Asynchronous RocksDB snapshot ({}, asynchronous part) in thread {} took {} ms.",
@@ -437,18 +467,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
return snapshotOperation.getSnapshotResultStateHandle();
}
-
- private void releaseSnapshotOperationResources(boolean canceled) {
- // hold the db lock while operation on the db to guard us against async db disposal
- synchronized (asyncSnapshotLock) {
- snapshotOperation.releaseSnapshotResources(canceled);
- }
- }
-
- @Override
- public void done(boolean canceled) {
- releaseSnapshotOperationResources(canceled);
- }
};
LOG.info("Asynchronous RocksDB snapshot (" + streamFactory + ", synchronous part) in thread " +
@@ -468,6 +486,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
private final RocksDBKeyedStateBackend<K> stateBackend;
private final KeyGroupRangeOffsets keyGroupRangeOffsets;
private final CheckpointStreamFactory checkpointStreamFactory;
+ private final CloseableRegistry snapshotCloseableRegistry;
private long checkpointId;
private long checkpointTimeStamp;
@@ -482,11 +501,13 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
RocksDBFullSnapshotOperation(
RocksDBKeyedStateBackend<K> stateBackend,
- CheckpointStreamFactory checkpointStreamFactory) {
+ CheckpointStreamFactory checkpointStreamFactory,
+ CloseableRegistry registry) {
this.stateBackend = stateBackend;
this.checkpointStreamFactory = checkpointStreamFactory;
this.keyGroupRangeOffsets = new KeyGroupRangeOffsets(stateBackend.keyGroupRange);
+ this.snapshotCloseableRegistry = registry;
}
/**
@@ -510,9 +531,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
*/
public void openCheckpointStream() throws Exception {
Preconditions.checkArgument(outStream == null, "Output stream for snapshot is already set.");
- outStream = checkpointStreamFactory.
- createCheckpointStateOutputStream(checkpointId, checkpointTimeStamp);
- stateBackend.cancelStreamRegistry.registerClosable(outStream);
+ outStream = checkpointStreamFactory.createCheckpointStateOutputStream(checkpointId, checkpointTimeStamp);
+ snapshotCloseableRegistry.registerCloseable(outStream);
outputView = new DataOutputViewStreamWrapper(outStream);
}
@@ -537,18 +557,25 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
*
* @throws IOException
*/
- public void closeCheckpointStream() throws IOException {
- if (outStream != null) {
- snapshotResultStateHandle = closeSnapshotStreamAndGetHandle();
- } else {
- snapshotResultStateHandle = null;
+ public void createSnapshotResultStateHandleFromOutputStream() throws IOException {
+
+ if (snapshotCloseableRegistry.unregisterCloseable(outStream)) {
+
+ StreamStateHandle stateHandle = outStream.closeAndGetHandle();
+ outStream = null;
+
+ if (stateHandle != null) {
+ this.snapshotResultStateHandle = new KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle);
+ }
}
}
/**
* 5) Release the snapshot object for RocksDB and clean up.
*/
- public void releaseSnapshotResources(boolean canceled) {
+ public void releaseSnapshotResources() {
+
+ outStream = null;
if (null != kvStateIterators) {
for (Tuple2<RocksIterator, Integer> kvStateIterator : kvStateIterators) {
@@ -569,12 +596,15 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
IOUtils.closeQuietly(readOptions);
readOptions = null;
}
+ }
- if (canceled) {
+ /**
+ * Drop the created snapshot if we have ben cancelled.
+ */
+ public void dropSnapshotResult() {
+ if (null != snapshotResultStateHandle) {
try {
- if (null != snapshotResultStateHandle) {
- snapshotResultStateHandle.discardState();
- }
+ snapshotResultStateHandle.discardState();
} catch (Exception e) {
LOG.warn("Exception occurred during snapshot state handle cleanup.", e);
}
@@ -719,13 +749,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
}
- private KeyGroupsStateHandle closeSnapshotStreamAndGetHandle() throws IOException {
- stateBackend.cancelStreamRegistry.unregisterClosable(outStream);
- StreamStateHandle stateHandle = outStream.closeAndGetHandle();
- outStream = null;
- return stateHandle != null ? new KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle) : null;
- }
-
private void writeKeyValuePair(byte[] key, byte[] value, DataOutputView out) throws IOException {
BytePrimitiveArraySerializer.INSTANCE.serialize(key, out);
BytePrimitiveArraySerializer.INSTANCE.serialize(value, out);
@@ -805,11 +828,11 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
FileSystem backupFileSystem = backupPath.getFileSystem();
inputStream = backupFileSystem.open(filePath);
- closeableRegistry.registerClosable(inputStream);
+ closeableRegistry.registerCloseable(inputStream);
outputStream = checkpointStreamFactory
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
- closeableRegistry.registerClosable(outputStream);
+ closeableRegistry.registerCloseable(outputStream);
while (true) {
int numBytes = inputStream.read(buffer);
@@ -821,19 +844,19 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
outputStream.write(buffer, 0, numBytes);
}
- closeableRegistry.unregisterClosable(outputStream);
- StreamStateHandle result = outputStream.closeAndGetHandle();
- outputStream = null;
-
+ StreamStateHandle result = null;
+ if (closeableRegistry.unregisterCloseable(outputStream)) {
+ result = outputStream.closeAndGetHandle();
+ outputStream = null;
+ }
return result;
+
} finally {
- if (inputStream != null) {
- closeableRegistry.unregisterClosable(inputStream);
+ if (inputStream != null && closeableRegistry.unregisterCloseable(inputStream)) {
inputStream.close();
}
- if (outputStream != null) {
- closeableRegistry.unregisterClosable(outputStream);
+ if (outputStream != null && closeableRegistry.unregisterCloseable(outputStream)) {
outputStream.close();
}
}
@@ -845,7 +868,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
try {
outputStream = checkpointStreamFactory
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
- closeableRegistry.registerClosable(outputStream);
+ closeableRegistry.registerCloseable(outputStream);
//no need for compression scheme support because sst-files are already compressed
KeyedBackendSerializationProxy<K> serializationProxy =
@@ -858,15 +881,17 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
serializationProxy.write(out);
- closeableRegistry.unregisterClosable(outputStream);
- StreamStateHandle result = outputStream.closeAndGetHandle();
- outputStream = null;
-
+ StreamStateHandle result = null;
+ if (closeableRegistry.unregisterCloseable(outputStream)) {
+ result = outputStream.closeAndGetHandle();
+ outputStream = null;
+ }
return result;
} finally {
if (outputStream != null) {
- closeableRegistry.unregisterClosable(outputStream);
- outputStream.close();
+ if (closeableRegistry.unregisterCloseable(outputStream)) {
+ outputStream.close();
+ }
}
}
}
@@ -905,7 +930,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
KeyedStateHandle materializeSnapshot() throws Exception {
- stateBackend.cancelStreamRegistry.registerClosable(closeableRegistry);
+ stateBackend.cancelStreamRegistry.registerCloseable(closeableRegistry);
// write meta data
metaStateHandle = materializeMetaData();
@@ -954,15 +979,25 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
void stop() {
- try {
- closeableRegistry.close();
- } catch (IOException e) {
- LOG.warn("Could not properly close io streams.", e);
+
+ if (stateBackend.cancelStreamRegistry.unregisterCloseable(closeableRegistry)) {
+ try {
+ closeableRegistry.close();
+ } catch (IOException e) {
+ LOG.warn("Could not properly close io streams.", e);
+ }
}
}
void releaseResources(boolean canceled) {
- stateBackend.cancelStreamRegistry.unregisterClosable(closeableRegistry);
+
+ if (stateBackend.cancelStreamRegistry.unregisterCloseable(closeableRegistry)) {
+ try {
+ closeableRegistry.close();
+ } catch (IOException e) {
+ LOG.warn("Exception on closing registry.", e);
+ }
+ }
if (backupPath != null) {
try {
@@ -1128,13 +1163,13 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
throws IOException, StateMigrationException, RocksDBException {
try {
currentStateHandleInStream = currentKeyGroupsStateHandle.openInputStream();
- rocksDBKeyedStateBackend.cancelStreamRegistry.registerClosable(currentStateHandleInStream);
+ rocksDBKeyedStateBackend.cancelStreamRegistry.registerCloseable(currentStateHandleInStream);
currentStateHandleInView = new DataInputViewStreamWrapper(currentStateHandleInStream);
restoreKVStateMetaData();
restoreKVStateData();
} finally {
- if (currentStateHandleInStream != null) {
- rocksDBKeyedStateBackend.cancelStreamRegistry.unregisterClosable(currentStateHandleInStream);
+ if (currentStateHandleInStream != null
+ && rocksDBKeyedStateBackend.cancelStreamRegistry.unregisterCloseable(currentStateHandleInStream)) {
IOUtils.closeQuietly(currentStateHandleInStream);
}
}
@@ -1275,7 +1310,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
try {
inputStream = metaStateHandle.openInputStream();
- stateBackend.cancelStreamRegistry.registerClosable(inputStream);
+ stateBackend.cancelStreamRegistry.registerCloseable(inputStream);
KeyedBackendSerializationProxy<T> serializationProxy =
new KeyedBackendSerializationProxy<>(stateBackend.userCodeClassLoader);
@@ -1298,8 +1333,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
return serializationProxy.getStateMetaInfoSnapshots();
} finally {
- if (inputStream != null) {
- stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
+ if (inputStream != null && stateBackend.cancelStreamRegistry.unregisterCloseable(inputStream)) {
inputStream.close();
}
}
@@ -1316,10 +1350,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
try {
inputStream = remoteFileHandle.openInputStream();
- stateBackend.cancelStreamRegistry.registerClosable(inputStream);
+ stateBackend.cancelStreamRegistry.registerCloseable(inputStream);
outputStream = restoreFileSystem.create(restoreFilePath, FileSystem.WriteMode.OVERWRITE);
- stateBackend.cancelStreamRegistry.registerClosable(outputStream);
+ stateBackend.cancelStreamRegistry.registerCloseable(outputStream);
byte[] buffer = new byte[8 * 1024];
while (true) {
@@ -1331,13 +1365,11 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
outputStream.write(buffer, 0, numBytes);
}
} finally {
- if (inputStream != null) {
- stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
+ if (inputStream != null && stateBackend.cancelStreamRegistry.unregisterCloseable(inputStream)) {
inputStream.close();
}
- if (outputStream != null) {
- stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
+ if (outputStream != null && stateBackend.cancelStreamRegistry.unregisterCloseable(outputStream)) {
outputStream.close();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java
index 29f363c..87d33d2 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java
@@ -49,7 +49,7 @@ public class CloseableRegistry extends AbstractCloseableRegistry<Closeable, Obje
}
@Override
- protected void doUnRegister(@Nonnull Closeable closeable, @Nonnull Map<Closeable, Object> closeableMap) {
- closeableMap.remove(closeable);
+ protected boolean doUnRegister(@Nonnull Closeable closeable, @Nonnull Map<Closeable, Object> closeableMap) {
+ return closeableMap.remove(closeable) != null;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataInputStream.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataInputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataInputStream.java
index 7c97271..173a890 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataInputStream.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataInputStream.java
@@ -55,7 +55,7 @@ public class ClosingFSDataInputStream
public void close() throws IOException {
if (!closed) {
closed = true;
- registry.unregisterClosable(this);
+ registry.unregisterCloseable(this);
inputStream.close();
}
}
@@ -93,7 +93,7 @@ public class ClosingFSDataInputStream
FSDataInputStream delegate, SafetyNetCloseableRegistry registry, String debugInfo) throws IOException{
ClosingFSDataInputStream inputStream = new ClosingFSDataInputStream(delegate, registry, debugInfo);
- registry.registerClosable(inputStream);
+ registry.registerCloseable(inputStream);
return inputStream;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataOutputStream.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataOutputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataOutputStream.java
index c517a83..cb7de92 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataOutputStream.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/ClosingFSDataOutputStream.java
@@ -60,7 +60,7 @@ public class ClosingFSDataOutputStream
public void close() throws IOException {
if (!closed) {
closed = true;
- registry.unregisterClosable(this);
+ registry.unregisterCloseable(this);
outputStream.close();
}
}
@@ -98,7 +98,7 @@ public class ClosingFSDataOutputStream
FSDataOutputStream delegate, SafetyNetCloseableRegistry registry, String debugInfo) throws IOException {
ClosingFSDataOutputStream inputStream = new ClosingFSDataOutputStream(delegate, registry, debugInfo);
- registry.registerClosable(inputStream);
+ registry.registerCloseable(inputStream);
return inputStream;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
index 6097334..9c4272f 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
@@ -98,7 +98,7 @@ public class SafetyNetCloseableRegistry extends
}
@Override
- protected void doUnRegister(
+ protected boolean doUnRegister(
@Nonnull WrappingProxyCloseable<? extends Closeable> closeable,
@Nonnull Map<Closeable, PhantomDelegatingCloseableRef> closeableMap) {
@@ -106,11 +106,7 @@ public class SafetyNetCloseableRegistry extends
Closeable innerCloseable = WrappingProxyUtil.stripProxy(closeable.getWrappedDelegate());
- if (null == innerCloseable) {
- return;
- }
-
- closeableMap.remove(innerCloseable);
+ return null != innerCloseable && closeableMap.remove(innerCloseable) != null;
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
index 4527b5e..14e765c 100644
--- a/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
+++ b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
@@ -68,7 +68,7 @@ public abstract class AbstractCloseableRegistry<C extends Closeable, T> implemen
* @param closeable Closeable tor register
* @throws IOException exception when the registry was closed before
*/
- public final void registerClosable(C closeable) throws IOException {
+ public final void registerCloseable(C closeable) throws IOException {
if (null == closeable) {
return;
@@ -89,15 +89,16 @@ public abstract class AbstractCloseableRegistry<C extends Closeable, T> implemen
* Removes a {@link Closeable} from the registry.
*
* @param closeable instance to remove from the registry.
+ * @return true if the closeable was previously registered and became unregistered through this call.
*/
- public final void unregisterClosable(C closeable) {
+ public final boolean unregisterCloseable(C closeable) {
if (null == closeable) {
- return;
+ return false;
}
synchronized (getSynchronizationLock()) {
- doUnRegister(closeable, closeableToRef);
+ return doUnRegister(closeable, closeableToRef);
}
}
@@ -137,7 +138,7 @@ public abstract class AbstractCloseableRegistry<C extends Closeable, T> implemen
* Does the actual un-registration of the closeable from the registry map. This should not do any long running or
* potentially blocking operations as is is executed under the registry's lock.
*/
- protected abstract void doUnRegister(@Nonnull C closeable, @Nonnull Map<Closeable, T> closeableMap);
+ protected abstract boolean doUnRegister(@Nonnull C closeable, @Nonnull Map<Closeable, T> closeableMap);
/**
* Returns the lock on which manipulations to members closeableToRef and closeable must be synchronized.
http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java
index 41b69c8..f9425f3 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java
@@ -91,7 +91,7 @@ public abstract class AbstractCloseableRegistryTest<C extends Closeable, T> {
try {
- closeableRegistry.registerClosable(testCloseable);
+ closeableRegistry.registerCloseable(testCloseable);
Assert.fail("Closed registry should not accept closeables!");
@@ -120,7 +120,7 @@ public abstract class AbstractCloseableRegistryTest<C extends Closeable, T> {
return null;
}).when(spyCloseable).close();
- closeableRegistry.registerClosable(spyCloseable);
+ closeableRegistry.registerCloseable(spyCloseable);
Assert.assertEquals(1, closeableRegistry.getNumberOfRegisteredCloseables());
@@ -138,7 +138,7 @@ public abstract class AbstractCloseableRegistryTest<C extends Closeable, T> {
final C testCloseable = spy(createCloseable());
try {
- closeableRegistry.registerClosable(testCloseable);
+ closeableRegistry.registerCloseable(testCloseable);
Assert.fail("Closed registry should not accept closeables!");
}catch (IOException ignore) {
}
@@ -214,10 +214,7 @@ public abstract class AbstractCloseableRegistryTest<C extends Closeable, T> {
@Override
public synchronized void close() throws IOException {
- if (refCount != null) {
- refCount.decrementAndGet();
- refCount = null;
- }
+ refCount.decrementAndGet();
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-core/src/test/java/org/apache/flink/core/fs/CloseableRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/CloseableRegistryTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/CloseableRegistryTest.java
index eb8d1f4..c3bf6e6 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/CloseableRegistryTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/CloseableRegistryTest.java
@@ -52,7 +52,7 @@ public class CloseableRegistryTest extends AbstractCloseableRegistryTest<Closeab
@Override
protected void createAndRegisterStream() throws IOException {
TestStream testStream = new TestStream(unclosedCounter);
- registry.registerClosable(testStream);
+ registry.registerCloseable(testStream);
}
};
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractAsyncSnapshotIOCallable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractAsyncSnapshotIOCallable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractAsyncSnapshotIOCallable.java
deleted file mode 100644
index 1aaa473..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractAsyncSnapshotIOCallable.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.flink.core.fs.CloseableRegistry;
-import org.apache.flink.runtime.io.async.AbstractAsyncIOCallable;
-import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import org.apache.flink.runtime.state.StateObject;
-import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * Abstract base class for async IO operations of snapshots against a
- * {@link java.util.zip.CheckedOutputStream}. This includes participating in lifecycle management
- * through a {@link CloseableRegistry}.
- */
-public abstract class AbstractAsyncSnapshotIOCallable<H extends StateObject>
- extends AbstractAsyncIOCallable<H, CheckpointStreamFactory.CheckpointStateOutputStream> {
-
- protected final long checkpointId;
- protected final long timestamp;
-
- protected final CheckpointStreamFactory streamFactory;
- protected final CloseableRegistry closeStreamOnCancelRegistry;
- protected final AtomicBoolean open;
-
- public AbstractAsyncSnapshotIOCallable(
- long checkpointId,
- long timestamp,
- CheckpointStreamFactory streamFactory,
- CloseableRegistry closeStreamOnCancelRegistry) {
-
- this.streamFactory = Preconditions.checkNotNull(streamFactory);
- this.closeStreamOnCancelRegistry = Preconditions.checkNotNull(closeStreamOnCancelRegistry);
- this.checkpointId = checkpointId;
- this.timestamp = timestamp;
- this.open = new AtomicBoolean(false);
- }
-
- @Override
- public CheckpointStreamFactory.CheckpointStateOutputStream openIOHandle() throws Exception {
- if (checkStreamClosedAndDoTransitionToOpen()) {
- CheckpointStreamFactory.CheckpointStateOutputStream stream =
- streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp);
- try {
- closeStreamOnCancelRegistry.registerClosable(stream);
- return stream;
- } catch (Exception ex) {
- open.set(false);
- throw ex;
- }
- } else {
- throw new IOException("Async snapshot: a checkpoint stream was already opened.");
- }
- }
-
- @Override
- public void done(boolean canceled) {
- if (checkStreamOpenAndDoTransitionToClose()) {
- CheckpointStreamFactory.CheckpointStateOutputStream stream = getIoHandle();
- if (stream != null) {
- closeStreamOnCancelRegistry.unregisterClosable(stream);
- IOUtils.closeQuietly(stream);
- }
- }
- }
-
- protected boolean checkStreamClosedAndDoTransitionToOpen() {
- return open.compareAndSet(false, true);
- }
-
- protected boolean checkStreamOpenAndDoTransitionToClose() {
- return open.compareAndSet(true, false);
- }
-
- protected StreamStateHandle closeStreamAndGetStateHandle() throws IOException {
- if (checkStreamOpenAndDoTransitionToClose()) {
- final CheckpointStreamFactory.CheckpointStateOutputStream stream = getIoHandle();
- try {
- return stream.closeAndGetHandle();
- } finally {
- closeStreamOnCancelRegistry.unregisterClosable(stream);
- }
- } else {
- throw new IOException("Checkpoint stream already closed.");
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AbstractAsyncCallableWithResources.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AbstractAsyncCallableWithResources.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AbstractAsyncCallableWithResources.java
new file mode 100644
index 0000000..bc0116c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AbstractAsyncCallableWithResources.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.async;
+
+import org.apache.flink.util.ExceptionUtils;
+
+import java.io.IOException;
+
+/**
+ * This abstract class encapsulates the lifecycle and execution strategy for asynchronous operations that use resources.
+ *
+ * @param <V> return type of the asynchronous call.
+ */
+public abstract class AbstractAsyncCallableWithResources<V> implements StoppableCallbackCallable<V> {
+
+ /** Tracks if the stop method was called on this object. */
+ private volatile boolean stopped;
+
+ /** Tracks if call method was executed (only before stop calls). */
+ private volatile boolean called;
+
+ /** Stores a collected exception if there was one during stop. */
+ private volatile Exception stopException;
+
+ public AbstractAsyncCallableWithResources() {
+ this.stopped = false;
+ this.called = false;
+ }
+
+ /**
+ * This method implements the strategy for the actual IO operation:
+ * <p>
+ * 1) Acquire resources asynchronously and atomically w.r.t stopping.
+ * 2) Performs the operation
+ * 3) Releases resources.
+ *
+ * @return Result of the IO operation, e.g. a deserialized object.
+ * @throws Exception exception that happened during the call.
+ */
+ @Override
+ public final V call() throws Exception {
+
+ V result = null;
+ Exception collectedException = null;
+
+ try {
+ synchronized (this) {
+
+ if (stopped) {
+ throw new IOException("Task was already stopped.");
+ }
+
+ called = true;
+ // Get resources in async part, atomically w.r.t. stopping.
+ acquireResources();
+ }
+
+ // The main work is performed here.
+ result = performOperation();
+
+ } catch (Exception ex) {
+ collectedException = ex;
+ } finally {
+
+ try {
+ // Cleanup
+ releaseResources();
+ } catch (Exception relEx) {
+ collectedException = ExceptionUtils.firstOrSuppressed(relEx, collectedException);
+ }
+
+ if (collectedException != null) {
+ throw collectedException;
+ }
+ }
+
+ return result;
+ }
+
+ /**
+ * Open the IO Handle (e.g. a stream) on which the operation will be performed.
+ *
+ * @return the opened IO handle that implements #Closeable
+ * @throws Exception if there was a problem in acquiring.
+ */
+ protected abstract void acquireResources() throws Exception;
+
+ /**
+ * Implements the actual operation.
+ *
+ * @return Result of the operation
+ * @throws Exception if there was a problem in executing the operation.
+ */
+ protected abstract V performOperation() throws Exception;
+
+ /**
+ * Releases resources acquired by this object.
+ *
+ * @throws Exception if there was a problem in releasing resources.
+ */
+ protected abstract void releaseResources() throws Exception;
+
+ /**
+ * This method implements how the operation is stopped. Usually this involves interrupting or closing some
+ * resources like streams to return from blocking calls.
+ *
+ * @throws Exception on problems during the stopping.
+ */
+ protected abstract void stopOperation() throws Exception;
+
+ /**
+ * Stops the I/O operation by closing the I/O handle. If an exception is thrown on close, it can be accessed via
+ * #getStopException().
+ */
+ @Override
+ public final void stop() {
+
+ synchronized (this) {
+
+ // Make sure that call can not enter execution from here.
+ if (stopped) {
+ return;
+ } else {
+ stopped = true;
+ }
+ }
+
+ if (called) {
+ // Async call is executing -> attempt to stop it and releaseResources() will happen inside the async method.
+ try {
+ stopOperation();
+ } catch (Exception stpEx) {
+ this.stopException = stpEx;
+ }
+ } else {
+ // Async call was not executed, so we also need to releaseResources() here.
+ try {
+ releaseResources();
+ } catch (Exception relEx) {
+ stopException = relEx;
+ }
+ }
+ }
+
+ /**
+ * Optional callback that subclasses can implement. This is called when the callable method completed, e.g. because
+ * it finished or was stopped.
+ */
+ @Override
+ public void done(boolean canceled) {
+ //optional callback hook
+ }
+
+ /**
+ * True once the async method was called.
+ */
+ public boolean isCalled() {
+ return called;
+ }
+
+ /**
+ * Check if the IO operation is stopped
+ *
+ * @return true if stop() was called
+ */
+ @Override
+ public boolean isStopped() {
+ return stopped;
+ }
+
+ /**
+ * Returns a potential exception that might have been observed while stopping the operation.
+ */
+ @Override
+ public Exception getStopException() {
+ return stopException;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AbstractAsyncIOCallable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AbstractAsyncIOCallable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AbstractAsyncIOCallable.java
deleted file mode 100644
index 1968d40..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AbstractAsyncIOCallable.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.async;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-/**
- * The abstract class encapsulates the lifecycle and execution strategy for asynchronous IO operations
- *
- * @param <V> return type of the asynchronous call
- * @param <D> type of the IO handle
- */
-public abstract class AbstractAsyncIOCallable<V, D extends Closeable> implements StoppableCallbackCallable<V> {
-
- private volatile boolean stopped;
-
- /**
- * Closable handle to IO, e.g. an InputStream
- */
- private volatile D ioHandle;
-
- /**
- * Stores exception that might happen during close
- */
- private volatile IOException stopException;
-
-
- public AbstractAsyncIOCallable() {
- this.stopped = false;
- }
-
- /**
- * This method implements the strategy for the actual IO operation:
- *
- * 1) Open the IO handle
- * 2) Perform IO operation
- * 3) Close IO handle
- *
- * @return Result of the IO operation, e.g. a deserialized object.
- * @throws Exception exception that happened during the call.
- */
- @Override
- public V call() throws Exception {
-
- synchronized (this) {
- if (isStopped()) {
- throw new IOException("Task was already stopped. No I/O handle opened.");
- }
-
- ioHandle = openIOHandle();
- }
-
- try {
-
- return performOperation();
-
- } finally {
- closeIOHandle();
- }
-
- }
-
- /**
- * Open the IO Handle (e.g. a stream) on which the operation will be performed.
- *
- * @return the opened IO handle that implements #Closeable
- * @throws Exception
- */
- protected abstract D openIOHandle() throws Exception;
-
- /**
- * Implements the actual IO operation on the opened IO handle.
- *
- * @return Result of the IO operation
- * @throws Exception
- */
- protected abstract V performOperation() throws Exception;
-
- /**
- * Stops the I/O operation by closing the I/O handle. If an exception is thrown on close, it can be accessed via
- * #getStopException().
- */
- @Override
- public void stop() {
- closeIOHandle();
- }
-
- private synchronized void closeIOHandle() {
-
- if (!stopped) {
- stopped = true;
-
- final D handle = ioHandle;
- if (handle != null) {
- try {
- handle.close();
- } catch (IOException ex) {
- stopException = ex;
- }
- }
- }
- }
-
- /**
- * Returns the IO handle.
- * @return the IO handle
- */
- protected D getIoHandle() {
- return ioHandle;
- }
-
- /**
- * Optional callback that subclasses can implement. This is called when the callable method completed, e.g. because
- * it finished or was stopped.
- */
- @Override
- public void done(boolean canceled) {
- //optional callback hook
- }
-
- /**
- * Check if the IO operation is stopped
- *
- * @return true if stop() was called
- */
- @Override
- public boolean isStopped() {
- return stopped;
- }
-
- /**
- * Returns Exception that might happen on stop.
- *
- * @return Potential Exception that happened open stopping.
- */
- @Override
- public IOException getStopException() {
- return stopException;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncStoppable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncStoppable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncStoppable.java
index 560e56a..8698600 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncStoppable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncStoppable.java
@@ -18,8 +18,6 @@
package org.apache.flink.runtime.io.async;
-import java.io.IOException;
-
/**
* An asynchronous operation that can be stopped.
*/
@@ -42,6 +40,6 @@ public interface AsyncStoppable {
*
* @return Exception that can happen during stop
*/
- IOException getStopException();
+ Exception getStopException();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index b16ac06..1fb03d7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.state;
-import org.apache.commons.io.IOUtils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ListState;
@@ -34,11 +33,13 @@ import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources;
import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StateMigrationException;
+
+import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -225,17 +226,37 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
}
// implementation of the async IO operation, based on FutureTask
- final AbstractAsyncSnapshotIOCallable<OperatorStateHandle> ioCallable =
- new AbstractAsyncSnapshotIOCallable<OperatorStateHandle>(
- checkpointId,
- timestamp,
- streamFactory,
- closeStreamOnCancelRegistry) {
+ final AbstractAsyncCallableWithResources<OperatorStateHandle> ioCallable =
+ new AbstractAsyncCallableWithResources<OperatorStateHandle>() {
+
+ CheckpointStreamFactory.CheckpointStateOutputStream out = null;
+
+ @Override
+ protected void acquireResources() throws Exception {
+ out = streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp);
+ closeStreamOnCancelRegistry.registerCloseable(out);
+ }
+
+ @Override
+ protected void releaseResources() throws Exception {
+ if (closeStreamOnCancelRegistry.unregisterCloseable(out)) {
+ IOUtils.closeQuietly(out);
+ }
+ }
+
+ @Override
+ protected void stopOperation() throws Exception {
+ if (closeStreamOnCancelRegistry.unregisterCloseable(out)) {
+ IOUtils.closeQuietly(out);
+ }
+ }
@Override
public OperatorStateHandle performOperation() throws Exception {
long asyncStartTime = System.currentTimeMillis();
+ CheckpointStreamFactory.CheckpointStateOutputStream localOut = this.out;
+
final Map<String, OperatorStateHandle.StateMetaInfo> writtenStatesMetaData =
new HashMap<>(registeredStatesDeepCopies.size());
@@ -246,8 +267,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
metaInfoSnapshots.add(entry.getValue().getStateMetaInfo().snapshot());
}
- CheckpointStreamFactory.CheckpointStateOutputStream out = getIoHandle();
- DataOutputView dov = new DataOutputViewStreamWrapper(out);
+ DataOutputView dov = new DataOutputViewStreamWrapper(localOut);
OperatorBackendSerializationProxy backendSerializationProxy =
new OperatorBackendSerializationProxy(metaInfoSnapshots);
@@ -260,25 +280,30 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
registeredStatesDeepCopies.entrySet()) {
PartitionableListState<?> value = entry.getValue();
- long[] partitionOffsets = value.write(out);
+ long[] partitionOffsets = value.write(localOut);
OperatorStateHandle.Mode mode = value.getStateMetaInfo().getAssignmentMode();
writtenStatesMetaData.put(
entry.getKey(),
new OperatorStateHandle.StateMetaInfo(partitionOffsets, mode));
}
- StreamStateHandle stateHandle = closeStreamAndGetStateHandle();
+ OperatorStateHandle retValue = null;
+
+ if (closeStreamOnCancelRegistry.unregisterCloseable(out)) {
+
+ StreamStateHandle stateHandle = out.closeAndGetHandle();
+
+ if (stateHandle != null) {
+ retValue = new OperatorStateHandle(writtenStatesMetaData, stateHandle);
+ }
+ }
if (asynchronousSnapshots) {
LOG.info("DefaultOperatorStateBackend snapshot ({}, asynchronous part) in thread {} took {} ms.",
streamFactory, Thread.currentThread(), (System.currentTimeMillis() - asyncStartTime));
}
- if (stateHandle == null) {
- return null;
- }
-
- return new OperatorStateHandle(writtenStatesMetaData, stateHandle);
+ return retValue;
}
};
@@ -308,7 +333,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
}
FSDataInputStream in = stateHandle.openInputStream();
- closeStreamOnCancelRegistry.registerClosable(in);
+ closeStreamOnCancelRegistry.registerCloseable(in);
ClassLoader restoreClassLoader = Thread.currentThread().getContextClassLoader();
@@ -370,8 +395,9 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
} finally {
Thread.currentThread().setContextClassLoader(restoreClassLoader);
- closeStreamOnCancelRegistry.unregisterClosable(in);
- IOUtils.closeQuietly(in);
+ if (closeStreamOnCancelRegistry.unregisterCloseable(in)) {
+ IOUtils.closeQuietly(in);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
index 031d7c7..750d206 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
@@ -254,9 +254,10 @@ public class StateInitializationContextImpl implements StateInitializationContex
this.offsets = metaOffsets;
this.offPos = 0;
- closableRegistry.unregisterClosable(currentStream);
- IOUtils.closeQuietly(currentStream);
- currentStream = null;
+ if (closableRegistry.unregisterCloseable(currentStream)) {
+ IOUtils.closeQuietly(currentStream);
+ currentStream = null;
+ }
return true;
}
@@ -308,14 +309,18 @@ public class StateInitializationContextImpl implements StateInitializationContex
}
protected void openCurrentStream() throws IOException {
+
+ Preconditions.checkState(currentStream == null);
+
FSDataInputStream stream = currentStateHandle.openInputStream();
- closableRegistry.registerClosable(stream);
+ closableRegistry.registerCloseable(stream);
currentStream = stream;
}
protected void closeCurrentStream() {
- closableRegistry.unregisterClosable(currentStream);
- IOUtils.closeQuietly(currentStream);
+ if (closableRegistry.unregisterCloseable(currentStream)) {
+ IOUtils.closeQuietly(currentStream);
+ }
currentStream = null;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
index 5db0138..6a8a08f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
@@ -88,7 +88,7 @@ public class StateSnapshotContextSynchronousImpl implements StateSnapshotContext
CheckpointStreamFactory.CheckpointStateOutputStream cout =
streamFactory.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
- closableRegistry.registerClosable(cout);
+ closableRegistry.registerCloseable(cout);
return cout;
}
@@ -120,22 +120,25 @@ public class StateSnapshotContextSynchronousImpl implements StateSnapshotContext
}
private <T extends StreamStateHandle> T closeAndUnregisterStreamToObtainStateHandle(
- NonClosingCheckpointOutputStream<T> stream) throws IOException {
- if (null == stream) {
+ NonClosingCheckpointOutputStream<T> stream) throws IOException {
+
+ if (null != stream && closableRegistry.unregisterCloseable(stream.getDelegate())) {
+ return stream.closeAndGetHandle();
+ } else {
return null;
}
-
- closableRegistry.unregisterClosable(stream.getDelegate());
-
- // for now we only support synchronous writing
- return stream.closeAndGetHandle();
}
- private <T extends StreamStateHandle> void closeAndUnregisterStream(NonClosingCheckpointOutputStream<T> stream) throws IOException {
+ private <T extends StreamStateHandle> void closeAndUnregisterStream(
+ NonClosingCheckpointOutputStream<T> stream) throws IOException {
+
Preconditions.checkNotNull(stream);
- closableRegistry.unregisterClosable(stream.getDelegate());
- stream.getDelegate().close();
+ CheckpointStreamFactory.CheckpointStateOutputStream delegate = stream.getDelegate();
+
+ if (closableRegistry.unregisterCloseable(delegate)) {
+ delegate.close();
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index e235b96..bf92b34 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -35,8 +35,8 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources;
import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
@@ -325,30 +325,56 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
//--------------------------------------------------- this becomes the end of sync part
// implementation of the async IO operation, based on FutureTask
- final AbstractAsyncSnapshotIOCallable<KeyedStateHandle> ioCallable =
- new AbstractAsyncSnapshotIOCallable<KeyedStateHandle>(
- checkpointId,
- timestamp,
- streamFactory,
- cancelStreamRegistry) {
+ final AbstractAsyncCallableWithResources<KeyedStateHandle> ioCallable =
+ new AbstractAsyncCallableWithResources<KeyedStateHandle>() {
+
+ CheckpointStreamFactory.CheckpointStateOutputStream stream = null;
+
+ @Override
+ protected void acquireResources() throws Exception {
+ stream = streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp);
+ cancelStreamRegistry.registerCloseable(stream);
+ }
+
+ @Override
+ protected void releaseResources() throws Exception {
+
+ if (cancelStreamRegistry.unregisterCloseable(stream)) {
+ IOUtils.closeQuietly(stream);
+ stream = null;
+ }
+
+ for (StateTableSnapshot tableSnapshot : cowStateStableSnapshots.values()) {
+ tableSnapshot.release();
+ }
+ }
+
+ @Override
+ protected void stopOperation() throws Exception {
+ if (cancelStreamRegistry.unregisterCloseable(stream)) {
+ IOUtils.closeQuietly(stream);
+ stream = null;
+ }
+ }
@Override
public KeyGroupsStateHandle performOperation() throws Exception {
long asyncStartTime = System.currentTimeMillis();
- CheckpointStreamFactory.CheckpointStateOutputStream stream = getIoHandle();
- DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(stream);
+ CheckpointStreamFactory.CheckpointStateOutputStream localStream = this.stream;
+
+ DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(localStream);
serializationProxy.write(outView);
long[] keyGroupRangeOffsets = new long[keyGroupRange.getNumberOfKeyGroups()];
for (int keyGroupPos = 0; keyGroupPos < keyGroupRange.getNumberOfKeyGroups(); ++keyGroupPos) {
int keyGroupId = keyGroupRange.getKeyGroupId(keyGroupPos);
- keyGroupRangeOffsets[keyGroupPos] = stream.getPos();
+ keyGroupRangeOffsets[keyGroupPos] = localStream.getPos();
outView.writeInt(keyGroupId);
for (Map.Entry<String, StateTable<K, ?, ?>> kvState : stateTables.entrySet()) {
- OutputStream kgCompressionOut = keyGroupCompressionDecorator.decorateWithCompression(stream);
+ OutputStream kgCompressionOut = keyGroupCompressionDecorator.decorateWithCompression(localStream);
DataOutputViewStreamWrapper kgCompressionView = new DataOutputViewStreamWrapper(kgCompressionOut);
kgCompressionView.writeShort(kVStateToId.get(kvState.getKey()));
cowStateStableSnapshots.get(kvState.getValue()).writeMappingsInKeyGroup(kgCompressionView, keyGroupId);
@@ -356,21 +382,29 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
}
- final StreamStateHandle streamStateHandle = closeStreamAndGetStateHandle();
+ if (cancelStreamRegistry.unregisterCloseable(stream)) {
- if (asynchronousSnapshots) {
- LOG.info("Heap backend snapshot ({}, asynchronous part) in thread {} took {} ms.",
- streamFactory, Thread.currentThread(), (System.currentTimeMillis() - asyncStartTime));
- }
+ final StreamStateHandle streamStateHandle = stream.closeAndGetHandle();
+ stream = null;
- if (streamStateHandle == null) {
- return null;
- }
+ if (asynchronousSnapshots) {
+ LOG.info("Heap backend snapshot ({}, asynchronous part) in thread {} took {} ms.",
+ streamFactory, Thread.currentThread(), (System.currentTimeMillis() - asyncStartTime));
+ }
- KeyGroupRangeOffsets offsets = new KeyGroupRangeOffsets(keyGroupRange, keyGroupRangeOffsets);
- final KeyGroupsStateHandle keyGroupsStateHandle = new KeyGroupsStateHandle(offsets, streamStateHandle);
+ if (streamStateHandle != null) {
- return keyGroupsStateHandle;
+ KeyGroupRangeOffsets offsets =
+ new KeyGroupRangeOffsets(keyGroupRange, keyGroupRangeOffsets);
+
+ final KeyGroupsStateHandle keyGroupsStateHandle =
+ new KeyGroupsStateHandle(offsets, streamStateHandle);
+
+ return keyGroupsStateHandle;
+ }
+ }
+
+ return null;
}
};
@@ -425,7 +459,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
KeyGroupsStateHandle keyGroupsStateHandle = (KeyGroupsStateHandle) keyedStateHandle;
FSDataInputStream fsDataInputStream = keyGroupsStateHandle.openInputStream();
- cancelStreamRegistry.registerClosable(fsDataInputStream);
+ cancelStreamRegistry.registerCloseable(fsDataInputStream);
try {
DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(fsDataInputStream);
@@ -533,8 +567,9 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
}
} finally {
- cancelStreamRegistry.unregisterClosable(fsDataInputStream);
- IOUtils.closeQuietly(fsDataInputStream);
+ if (cancelStreamRegistry.unregisterCloseable(fsDataInputStream)) {
+ IOUtils.closeQuietly(fsDataInputStream);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 6089240..631cdfc 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -709,7 +709,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
OperatorStateBackend operatorStateBackend = stateBackend.createOperatorStateBackend(env, opId);
// let operator state backend participate in the operator lifecycle, i.e. make it responsive to cancelation
- cancelables.registerClosable(operatorStateBackend);
+ cancelables.registerCloseable(operatorStateBackend);
// restore if we have some old state
if (null != restoreStateHandles) {
@@ -742,7 +742,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
getEnvironment().getTaskKvStateRegistry());
// let keyed state backend participate in the operator lifecycle, i.e. make it responsive to cancelation
- cancelables.registerClosable(keyedStateBackend);
+ cancelables.registerCloseable(keyedStateBackend);
// restore if we have some old state
Collection<KeyedStateHandle> restoreKeyedStateHandles = null;
@@ -933,7 +933,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
owner.handleAsyncException("Failure in asynchronous checkpoint materialization", asyncException);
} finally {
- owner.cancelables.unregisterClosable(this);
+ owner.cancelables.unregisterCloseable(this);
FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
}
}
@@ -1086,7 +1086,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
checkpointMetrics,
startAsyncPartNano);
- owner.cancelables.registerClosable(asyncCheckpointRunnable);
+ owner.cancelables.registerCloseable(asyncCheckpointRunnable);
owner.asyncOperationsThreadPool.submit(asyncCheckpointRunnable);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 9bb91ad..811d700 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -1117,7 +1117,7 @@ public class StreamTaskTest extends TestLogger {
holder.start();
try {
// cancellation should try and cancel this
- getCancelables().registerClosable(holder);
+ getCancelables().registerCloseable(holder);
// wait till the lock holder has the lock
latch.await();
http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 9156f34..793e8f6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -220,7 +220,7 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable {
environment,
operator.getClass().getSimpleName());
- mockTask.getCancelables().registerClosable(osb);
+ mockTask.getCancelables().registerCloseable(osb);
if (null != stateHandles) {
osb.restore(stateHandles);
http://git-wip-us.apache.org/repos/asf/flink/blob/5af463a9/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
index 4d5fa71..829ac93 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
@@ -180,7 +180,8 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
}
case ROCKSDB_FULLY_ASYNC: {
String rocksDb = tempFolder.newFolder().getAbsolutePath();
- RocksDBStateBackend rdb = new RocksDBStateBackend(new MemoryStateBackend(MAX_MEM_STATE_SIZE));
+ String backups = tempFolder.newFolder().getAbsolutePath();
+ RocksDBStateBackend rdb = new RocksDBStateBackend(new FsStateBackend("file://" + backups));
rdb.setDbStoragePath(rocksDb);
this.stateBackend = rdb;
break;