You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/24 11:28:32 UTC

[4/4] flink git commit: [FLINK-5895] [runtime] Decrease logging aggressiveness of FileSystemSafetyNet

[FLINK-5895] [runtime] Decrease logging aggressiveness of FileSystemSafetyNet


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f6e6e7ec
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f6e6e7ec
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f6e6e7ec

Branch: refs/heads/master
Commit: f6e6e7ecf4d287f76698302417a9ff2ffc869477
Parents: 15ae922
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Feb 23 16:21:46 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Feb 24 12:15:18 2017 +0100

----------------------------------------------------------------------
 .../java/org/apache/flink/core/fs/FileSystemSafetyNet.java    | 7 -------
 .../main/java/org/apache/flink/runtime/taskmanager/Task.java  | 7 +++++++
 2 files changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f6e6e7ec/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java
index b18cb13..eb28504 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java
@@ -21,9 +21,6 @@ package org.apache.flink.core.fs;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.util.IOUtils;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.net.URI;
 
 import static org.apache.flink.util.Preconditions.checkState;
@@ -65,8 +62,6 @@ import static org.apache.flink.util.Preconditions.checkState;
 @Internal
 public class FileSystemSafetyNet {
 
-	private static final Logger LOG = LoggerFactory.getLogger(FileSystemSafetyNet.class);
-
 	/** The map from thread to the safety net registry for that thread */
 	private static final ThreadLocal<SafetyNetCloseableRegistry> REGISTRIES = new ThreadLocal<>();
 
@@ -93,7 +88,6 @@ public class FileSystemSafetyNet {
 
 		SafetyNetCloseableRegistry newRegistry = new SafetyNetCloseableRegistry();
 		REGISTRIES.set(newRegistry);
-		LOG.info("Created new CloseableRegistry {} for {}", newRegistry, Thread.currentThread().getName());
 	}
 
 	/**
@@ -107,7 +101,6 @@ public class FileSystemSafetyNet {
 	public static void closeSafetyNetAndGuardedResourcesForThread() {
 		SafetyNetCloseableRegistry registry = REGISTRIES.get();
 		if (null != registry) {
-			LOG.info("Ensuring all FileSystem streams are closed for {}", Thread.currentThread().getName());
 			REGISTRIES.remove();
 			IOUtils.closeQuietly(registry);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/f6e6e7ec/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index c9f17b8..8732c60 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -553,6 +553,7 @@ public class Task implements Runnable, TaskActions {
 			// ----------------------------
 
 			// activate safety net for task thread
+			LOG.info("Creating FileSystem stream leak safety net for task {}", this);
 			FileSystemSafetyNet.initializeSafetyNetForThread();
 
 			// first of all, get a user-code classloader
@@ -792,6 +793,7 @@ public class Task implements Runnable, TaskActions {
 				removeCachedFiles(distributedCacheEntries, fileCache);
 
 				// close and de-activate safety net for task thread
+				LOG.info("Ensuring all FileSystem streams are closed for task {}", this); 
 				FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
 
 				notifyFinalState();
@@ -1138,7 +1140,9 @@ public class Task implements Runnable, TaskActions {
 					@Override
 					public void run() {
 						// activate safety net for checkpointing thread
+						LOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName());
 						FileSystemSafetyNet.initializeSafetyNetForThread();
+
 						try {
 							boolean success = statefulTask.triggerCheckpoint(checkpointMetaData, checkpointOptions);
 							if (!success) {
@@ -1159,6 +1163,9 @@ public class Task implements Runnable, TaskActions {
 							}
 						} finally {
 							// close and de-activate safety net for checkpointing thread
+							LOG.debug("Ensuring all FileSystem streams are closed for {}",
+									Thread.currentThread().getName());
+
 							FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
 						}
 					}