You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/06/13 09:45:42 UTC

flink git commit: [FLINK-6685] Adjust scopes of SafetyNetCloseableRegistry usages

Repository: flink
Updated Branches:
  refs/heads/master 1a658775c -> 68ac96e16


[FLINK-6685] Adjust scopes of SafetyNetCloseableRegistry usages

This closes #4108.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/68ac96e1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/68ac96e1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/68ac96e1

Branch: refs/heads/master
Commit: 68ac96e16c09d7aee64d3dc0e5629cc308fb087f
Parents: 1a65877
Author: Stefan Richter <s....@data-artisans.com>
Authored: Mon Jun 12 11:48:15 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Jun 13 11:35:44 2017 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/core/fs/FileSystemSafetyNet.java     | 2 --
 .../main/java/org/apache/flink/runtime/taskmanager/Task.java   | 6 ++----
 .../org/apache/flink/streaming/runtime/tasks/StreamTask.java   | 3 +++
 3 files changed, 5 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/68ac96e1/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java
index 1391a33..c06ccac 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java
@@ -118,9 +118,7 @@ public class FileSystemSafetyNet {
 
 	/**
 	 * Sets the active safety-net registry for the current thread.
-	 * @deprecated This method should be removed after FLINK-6684 is implemented.
 	 */
-	@Deprecated
 	@Internal
 	public static void setSafetyNetCloseableRegistryForThread(SafetyNetCloseableRegistry registry) {
 		REGISTRIES.set(registry);

http://git-wip-us.apache.org/repos/asf/flink/blob/68ac96e1/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index e18628e..9dc6e34 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -1177,7 +1177,7 @@ public class Task implements Runnable, TaskActions {
 				Runnable runnable = new Runnable() {
 					@Override
 					public void run() {
-						// activate safety net for checkpointing thread
+						// set safety net from the task's context for checkpointing thread
 						LOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName());
 						FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry);
 
@@ -1200,9 +1200,7 @@ public class Task implements Runnable, TaskActions {
 									taskNameWithSubtask, executionId, t);
 							}
 						} finally {
-							// close and de-activate safety net for checkpointing thread
-							LOG.debug("Ensuring all FileSystem streams are closed for {}",
-									Thread.currentThread().getName());
+							FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(null);
 						}
 					}
 				};

http://git-wip-us.apache.org/repos/asf/flink/blob/68ac96e1/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 5fb5d2d..c35a6dc 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -21,6 +21,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.FileSystemSafetyNet;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
@@ -892,6 +893,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 
 		@Override
 		public void run() {
+			FileSystemSafetyNet.initializeSafetyNetForThread();
 			try {
 				// Keyed state handle future, currently only one (the head) operator can have this
 				KeyedStateHandle keyedStateHandleBackend = FutureUtil.runIfNotDoneAndGet(futureKeyedBackendStateHandles);
@@ -973,6 +975,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 				owner.handleAsyncException("Failure in asynchronous checkpoint materialization", asyncException);
 			} finally {
 				owner.cancelables.unregisterClosable(this);
+				FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
 			}
 		}