You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/23 16:42:35 UTC

[1/2] flink git commit: [FLINK-5895] [runtime] Decrease logging aggressiveness of FileSystemSafetyNet

Repository: flink
Updated Branches:
  refs/heads/release-1.2 db3c5f388 -> b5ec14641


[FLINK-5895] [runtime] Decrease logging aggressiveness of FileSystemSafetyNet


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/74b29f5a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/74b29f5a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/74b29f5a

Branch: refs/heads/release-1.2
Commit: 74b29f5a3dc4f1413bbf8addb6b4234a5bfe6581
Parents: db3c5f3
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Feb 23 16:21:46 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Feb 23 16:26:01 2017 +0100

----------------------------------------------------------------------
 .../src/main/java/org/apache/flink/core/fs/FileSystem.java   | 6 ------
 .../main/java/org/apache/flink/runtime/taskmanager/Task.java | 8 ++++++++
 2 files changed, 8 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/74b29f5a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index d8efcbc..991c718 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -33,8 +33,6 @@ import org.apache.flink.core.fs.local.LocalFileSystem;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.OperatingSystem;
 import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 import java.io.File;
@@ -86,8 +84,6 @@ public abstract class FileSystem {
 
 	private static final String HADOOP_WRAPPER_SCHEME = "hdwrapper";
 
-	private static final Logger LOG = LoggerFactory.getLogger(FileSystem.class);
-
 	/** This lock guards the methods {@link #initOutPathLocalFS(Path, WriteMode, boolean)} and
 	 * {@link #initOutPathDistFS(Path, WriteMode, boolean)} which are otherwise susceptible to races */
 	private static final ReentrantLock OUTPUT_DIRECTORY_INIT_LOCK = new ReentrantLock(true);
@@ -107,7 +103,6 @@ public abstract class FileSystem {
 
 		SafetyNetCloseableRegistry newRegistry = new SafetyNetCloseableRegistry();
 		REGISTRIES.set(newRegistry);
-		LOG.info("Created new CloseableRegistry " + newRegistry + " for {}", Thread.currentThread().getName());
 	}
 
 	/**
@@ -118,7 +113,6 @@ public abstract class FileSystem {
 	public static void closeAndDisposeFileSystemCloseableRegistryForThread() {
 		SafetyNetCloseableRegistry registry = REGISTRIES.get();
 		if (null != registry) {
-			LOG.info("Ensuring all FileSystem streams are closed for {}", Thread.currentThread().getName());
 			REGISTRIES.remove();
 			IOUtils.closeQuietly(registry);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/74b29f5a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index ff81827..d242d7a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -539,6 +539,7 @@ public class Task implements Runnable, TaskActions {
 			// ----------------------------
 
 			// activate safety net for task thread
+			LOG.info("Creating FileSystem stream leak safety net for task {}", this);
 			FileSystem.createAndSetFileSystemCloseableRegistryForThread();
 
 			// first of all, get a user-code classloader
@@ -763,7 +764,9 @@ public class Task implements Runnable, TaskActions {
 
 				// remove all files in the distributed cache
 				removeCachedFiles(distributedCacheEntries, fileCache);
+
 				// close and de-activate safety net for task thread
+				LOG.info("Ensuring all FileSystem streams are closed for task {}", this);
 				FileSystem.closeAndDisposeFileSystemCloseableRegistryForThread();
 
 				notifyFinalState();
@@ -1106,7 +1109,9 @@ public class Task implements Runnable, TaskActions {
 					@Override
 					public void run() {
 						// activate safety net for checkpointing thread
+						LOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName());
 						FileSystem.createAndSetFileSystemCloseableRegistryForThread();
+
 						try {
 							boolean success = statefulTask.triggerCheckpoint(checkpointMetaData);
 							if (!success) {
@@ -1127,6 +1132,9 @@ public class Task implements Runnable, TaskActions {
 							}
 						} finally {
 							// close and de-activate safety net for checkpointing thread
+							LOG.debug("Ensuring all FileSystem streams are closed for {}",
+									Thread.currentThread().getName());
+
 							FileSystem.closeAndDisposeFileSystemCloseableRegistryForThread();
 						}
 					}


[2/2] flink git commit: [FLINK-5877] [docs] Fix Async I/O Scala snippet

Posted by se...@apache.org.
[FLINK-5877] [docs] Fix Async I/O Scala snippet

This closes #3383


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b5ec1464
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b5ec1464
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b5ec1464

Branch: refs/heads/release-1.2
Commit: b5ec146413bedf55867e15652c7e29f1e4e2d220
Parents: 74b29f5
Author: Andrea Sella <an...@radicalbit.io>
Authored: Tue Feb 21 21:18:16 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Feb 23 17:37:27 2017 +0100

----------------------------------------------------------------------
 docs/dev/stream/asyncio.md | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b5ec1464/docs/dev/stream/asyncio.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/asyncio.md b/docs/dev/stream/asyncio.md
index dbf2b9c..c4414b4 100644
--- a/docs/dev/stream/asyncio.md
+++ b/docs/dev/stream/asyncio.md
@@ -139,7 +139,7 @@ class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] {
     lazy val client: DatabaseClient = new DatabaseClient(host, post, credentials)
 
     /** The context used for the future callbacks */
-    implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor()))
+    implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor())
 
 
     override def asyncInvoke(str: String, asyncCollector: AsyncCollector[(String, String)]): Unit = {
@@ -150,8 +150,8 @@ class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] {
         // set the callback to be executed once the request by the client is complete
         // the callback simply forwards the result to the collector
         resultFuture.onSuccess {
-            case result: String => asyncCollector.collect(Collections.singleton((str, result)));
-        })
+            case result: String => asyncCollector.collect(Iterable((str, result)));
+        }
     }
 }