You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/24 11:28:32 UTC
[4/4] flink git commit: [FLINK-5895] [runtime] Decrease logging
aggressiveness of FileSystemSafetyNet
[FLINK-5895] [runtime] Decrease logging aggressiveness of FileSystemSafetyNet
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f6e6e7ec
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f6e6e7ec
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f6e6e7ec
Branch: refs/heads/master
Commit: f6e6e7ecf4d287f76698302417a9ff2ffc869477
Parents: 15ae922
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Feb 23 16:21:46 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Feb 24 12:15:18 2017 +0100
----------------------------------------------------------------------
.../java/org/apache/flink/core/fs/FileSystemSafetyNet.java | 7 -------
.../main/java/org/apache/flink/runtime/taskmanager/Task.java | 7 +++++++
2 files changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f6e6e7ec/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java
index b18cb13..eb28504 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java
@@ -21,9 +21,6 @@ package org.apache.flink.core.fs;
import org.apache.flink.annotation.Internal;
import org.apache.flink.util.IOUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.net.URI;
import static org.apache.flink.util.Preconditions.checkState;
@@ -65,8 +62,6 @@ import static org.apache.flink.util.Preconditions.checkState;
@Internal
public class FileSystemSafetyNet {
- private static final Logger LOG = LoggerFactory.getLogger(FileSystemSafetyNet.class);
-
/** The map from thread to the safety net registry for that thread */
private static final ThreadLocal<SafetyNetCloseableRegistry> REGISTRIES = new ThreadLocal<>();
@@ -93,7 +88,6 @@ public class FileSystemSafetyNet {
SafetyNetCloseableRegistry newRegistry = new SafetyNetCloseableRegistry();
REGISTRIES.set(newRegistry);
- LOG.info("Created new CloseableRegistry {} for {}", newRegistry, Thread.currentThread().getName());
}
/**
@@ -107,7 +101,6 @@ public class FileSystemSafetyNet {
public static void closeSafetyNetAndGuardedResourcesForThread() {
SafetyNetCloseableRegistry registry = REGISTRIES.get();
if (null != registry) {
- LOG.info("Ensuring all FileSystem streams are closed for {}", Thread.currentThread().getName());
REGISTRIES.remove();
IOUtils.closeQuietly(registry);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f6e6e7ec/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index c9f17b8..8732c60 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -553,6 +553,7 @@ public class Task implements Runnable, TaskActions {
// ----------------------------
// activate safety net for task thread
+ LOG.info("Creating FileSystem stream leak safety net for task {}", this);
FileSystemSafetyNet.initializeSafetyNetForThread();
// first of all, get a user-code classloader
@@ -792,6 +793,7 @@ public class Task implements Runnable, TaskActions {
removeCachedFiles(distributedCacheEntries, fileCache);
// close and de-activate safety net for task thread
+ LOG.info("Ensuring all FileSystem streams are closed for task {}", this);
FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
notifyFinalState();
@@ -1138,7 +1140,9 @@ public class Task implements Runnable, TaskActions {
@Override
public void run() {
// activate safety net for checkpointing thread
+ LOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName());
FileSystemSafetyNet.initializeSafetyNetForThread();
+
try {
boolean success = statefulTask.triggerCheckpoint(checkpointMetaData, checkpointOptions);
if (!success) {
@@ -1159,6 +1163,9 @@ public class Task implements Runnable, TaskActions {
}
} finally {
// close and de-activate safety net for checkpointing thread
+ LOG.debug("Ensuring all FileSystem streams are closed for {}",
+ Thread.currentThread().getName());
+
FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
}
}