You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/06/13 09:45:42 UTC
flink git commit: [FLINK-6685] Adjust scopes of
SafetyNetCloseableRegistry usages
Repository: flink
Updated Branches:
refs/heads/master 1a658775c -> 68ac96e16
[FLINK-6685] Adjust scopes of SafetyNetCloseableRegistry usages
This closes #4108.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/68ac96e1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/68ac96e1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/68ac96e1
Branch: refs/heads/master
Commit: 68ac96e16c09d7aee64d3dc0e5629cc308fb087f
Parents: 1a65877
Author: Stefan Richter <s....@data-artisans.com>
Authored: Mon Jun 12 11:48:15 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Jun 13 11:35:44 2017 +0200
----------------------------------------------------------------------
.../java/org/apache/flink/core/fs/FileSystemSafetyNet.java | 2 --
.../main/java/org/apache/flink/runtime/taskmanager/Task.java | 6 ++----
.../org/apache/flink/streaming/runtime/tasks/StreamTask.java | 3 +++
3 files changed, 5 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/68ac96e1/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java
index 1391a33..c06ccac 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java
@@ -118,9 +118,7 @@ public class FileSystemSafetyNet {
/**
* Sets the active safety-net registry for the current thread.
- * @deprecated This method should be removed after FLINK-6684 is implemented.
*/
- @Deprecated
@Internal
public static void setSafetyNetCloseableRegistryForThread(SafetyNetCloseableRegistry registry) {
REGISTRIES.set(registry);
http://git-wip-us.apache.org/repos/asf/flink/blob/68ac96e1/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index e18628e..9dc6e34 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -1177,7 +1177,7 @@ public class Task implements Runnable, TaskActions {
Runnable runnable = new Runnable() {
@Override
public void run() {
- // activate safety net for checkpointing thread
+ // set safety net from the task's context for checkpointing thread
LOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName());
FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry);
@@ -1200,9 +1200,7 @@ public class Task implements Runnable, TaskActions {
taskNameWithSubtask, executionId, t);
}
} finally {
- // close and de-activate safety net for checkpointing thread
- LOG.debug("Ensuring all FileSystem streams are closed for {}",
- Thread.currentThread().getName());
+ FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(null);
}
}
};
http://git-wip-us.apache.org/repos/asf/flink/blob/68ac96e1/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 5fb5d2d..c35a6dc 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -21,6 +21,7 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.FileSystemSafetyNet;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
@@ -892,6 +893,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
@Override
public void run() {
+ FileSystemSafetyNet.initializeSafetyNetForThread();
try {
// Keyed state handle future, currently only one (the head) operator can have this
KeyedStateHandle keyedStateHandleBackend = FutureUtil.runIfNotDoneAndGet(futureKeyedBackendStateHandles);
@@ -973,6 +975,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
owner.handleAsyncException("Failure in asynchronous checkpoint materialization", asyncException);
} finally {
owner.cancelables.unregisterClosable(this);
+ FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
}
}