You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/12/01 09:44:46 UTC

[1/7] flink git commit: [FLINK-5192] [logging] Improve log config templates

Repository: flink
Updated Branches:
  refs/heads/master 3223a160f -> 7d66aaeb0


[FLINK-5192] [logging] Improve log config templates

This closes #2899.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7d66aaeb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7d66aaeb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7d66aaeb

Branch: refs/heads/master
Commit: 7d66aaeb0fb52c5cd258e8b32ba8394eedd5d4ca
Parents: bf859e7
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Nov 29 17:14:23 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Dec 1 10:44:23 2016 +0100

----------------------------------------------------------------------
 .../src/main/flink-bin/conf/log4j.properties    | 14 +++++++++-
 flink-dist/src/main/flink-bin/conf/logback.xml  | 28 +++++++++++++++++++-
 2 files changed, 40 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7d66aaeb/flink-dist/src/main/flink-bin/conf/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/conf/log4j.properties b/flink-dist/src/main/flink-bin/conf/log4j.properties
index 97ec653..8e00ce3 100644
--- a/flink-dist/src/main/flink-bin/conf/log4j.properties
+++ b/flink-dist/src/main/flink-bin/conf/log4j.properties
@@ -16,8 +16,20 @@
 # limitations under the License.
 ################################################################################
 
+# This affects logging for both user code and Flink
 log4j.rootLogger=INFO, file
 
+# Uncomment this if you want to _only_ change Flink's logging
+#log4j.logger.org.apache.flink=INFO
+
+# The following lines keep the log level of common libraries/connectors on
+# log level INFO. The root logger does not override this. You have to manually
+# change the log levels here.
+log4j.logger.akka=INFO
+log4j.logger.org.apache.kafka=INFO
+log4j.logger.org.apache.hadoop=INFO
+log4j.logger.org.apache.zookeeper=INFO
+
 # Log all infos in the given file
 log4j.appender.file=org.apache.log4j.FileAppender
 log4j.appender.file.file=${log.file}
@@ -25,5 +37,5 @@ log4j.appender.file.append=false
 log4j.appender.file.layout=org.apache.log4j.PatternLayout
 log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
 
-# suppress the irrelevant (wrong) warnings from the netty channel handler
+# Suppress the irrelevant (wrong) warnings from the Netty channel handler
 log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file

http://git-wip-us.apache.org/repos/asf/flink/blob/7d66aaeb/flink-dist/src/main/flink-bin/conf/logback.xml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/conf/logback.xml b/flink-dist/src/main/flink-bin/conf/logback.xml
index 1147a70..f3c4331 100644
--- a/flink-dist/src/main/flink-bin/conf/logback.xml
+++ b/flink-dist/src/main/flink-bin/conf/logback.xml
@@ -25,8 +25,34 @@
         </encoder>
     </appender>
 
+    <!-- This affects logging for both user code and Flink -->
     <root level="INFO">
         <appender-ref ref="file"/>
     </root>
-</configuration>
 
+    <!-- Uncomment this if you want to only change Flink's logging -->
+    <!--<logger name="org.apache.flink" level="INFO">-->
+        <!--<appender-ref ref="file"/>-->
+    <!--</logger>-->
+
+    <!-- The following lines keep the log level of common libraries/connectors on
+         log level INFO. The root logger does not override this. You have to manually
+         change the log levels here. -->
+    <logger name="akka" level="INFO">
+        <appender-ref ref="file"/>
+    </logger>
+    <logger name="org.apache.kafka" level="INFO">
+        <appender-ref ref="file"/>
+    </logger>
+    <logger name="org.apache.hadoop" level="INFO">
+        <appender-ref ref="file"/>
+    </logger>
+    <logger name="org.apache.zookeeper" level="INFO">
+        <appender-ref ref="file"/>
+    </logger>
+
+    <!-- Suppress the irrelevant (wrong) warnings from the Netty channel handler -->
+    <logger name="org.jboss.netty.channel.DefaultChannelPipeline" level="ERROR">
+        <appender-ref ref="file"/>
+    </logger>
+</configuration>


[2/7] flink git commit: [FLINK-5199] [logging] Improve logging in ZooKeeperSubmittedJobGraphStore

Posted by uc...@apache.org.
[FLINK-5199] [logging] Improve logging in ZooKeeperSubmittedJobGraphStore


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f91dd9fb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f91dd9fb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f91dd9fb

Branch: refs/heads/master
Commit: f91dd9fbaf392bc2968e974dddba4cda2a4f3be2
Parents: dc7d8ec
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Nov 29 16:35:14 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Dec 1 10:44:23 2016 +0100

----------------------------------------------------------------------
 .../runtime/jobmanager/SubmittedJobGraph.java   |  2 +-
 .../ZooKeeperSubmittedJobGraphStore.java        | 86 +++++++++++++-------
 2 files changed, 56 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f91dd9fb/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java
index faacc93..e868da7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java
@@ -72,6 +72,6 @@ public class SubmittedJobGraph implements Serializable {
 
 	@Override
 	public String toString() {
-		return String.format("SubmittedJobGraph(%s, %s)", jobGraph, jobInfo);
+		return String.format("SubmittedJobGraph(%s, %s)", jobGraph.getJobID(), jobInfo);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f91dd9fb/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
index b241712..c1dc656 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
@@ -82,6 +82,9 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 	 */
 	private final PathChildrenCache pathCache;
 
+	/** The full configured base path including the namespace. */
+	private final String zooKeeperFullBasePath;
+
 	/** The external listener to be notified on races. */
 	private SubmittedJobGraphListener jobGraphListener;
 
@@ -117,6 +120,7 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 		// All operations will have the path as root
 		CuratorFramework facade = client.usingNamespace(client.getNamespace() + currentJobsPath);
 
+		this.zooKeeperFullBasePath = client.getNamespace() + currentJobsPath;
 		this.jobGraphsInZooKeeper = new ZooKeeperStateHandleStore<>(facade, stateStorage, executor);
 
 		this.pathCache = new PathChildrenCache(facade, "/", false);
@@ -156,6 +160,7 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 		synchronized (cacheLock) {
 			verifyIsRunning();
 
+			LOG.debug("Recovering all job graphs from ZooKeeper at {}.", zooKeeperFullBasePath);
 			List<Tuple2<RetrievableStateHandle<SubmittedJobGraph>, String>> submitted;
 
 			while (true) {
@@ -168,6 +173,8 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 				}
 			}
 
+			LOG.info("Found {} job graphs.", submitted.size());
+
 			if (submitted.size() != 0) {
 				List<SubmittedJobGraph> jobGraphs = new ArrayList<>(submitted.size());
 
@@ -193,6 +200,8 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 		checkNotNull(jobId, "Job ID");
 		String path = getPathForJob(jobId);
 
+		LOG.debug("Recovering job graph {} from {}{}.", jobId, zooKeeperFullBasePath, path);
+
 		synchronized (cacheLock) {
 			verifyIsRunning();
 
@@ -215,6 +224,8 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 		checkNotNull(jobGraph, "Job graph");
 		String path = getPathForJob(jobGraph.getJobId());
 
+		LOG.debug("Adding job graph {} to {}{}.", jobGraph.getJobId(), zooKeeperFullBasePath, path);
+
 		boolean success = false;
 
 		while (!success) {
@@ -229,8 +240,6 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 
 						addedJobGraphs.add(jobGraph.getJobId());
 
-						LOG.info("Added {} to ZooKeeper.", jobGraph);
-
 						success = true;
 					}
 					catch (KeeperException.NodeExistsException ignored) {
@@ -252,6 +261,8 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 				}
 			}
 		}
+
+		LOG.info("Added {} to ZooKeeper.", jobGraph);
 	}
 
 	@Override
@@ -259,14 +270,17 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 		checkNotNull(jobId, "Job ID");
 		String path = getPathForJob(jobId);
 
+		LOG.debug("Removing job graph {} from {}{}.", jobId, zooKeeperFullBasePath, path);
+
 		synchronized (cacheLock) {
 			if (addedJobGraphs.contains(jobId)) {
 				jobGraphsInZooKeeper.removeAndDiscardState(path);
 
 				addedJobGraphs.remove(jobId);
-				LOG.info("Removed job graph {} from ZooKeeper.", jobId);
 			}
 		}
+
+		LOG.info("Removed job graph {} from ZooKeeper.", jobId);
 	}
 
 	/**
@@ -291,70 +305,80 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 			}
 
 			switch (event.getType()) {
-				case CHILD_ADDED:
+				case CHILD_ADDED: {
+					JobID jobId = fromEvent(event);
+
+					LOG.debug("Received CHILD_ADDED event notification for job {}", jobId);
+
 					synchronized (cacheLock) {
 						try {
-							JobID jobId = fromEvent(event);
 							if (jobGraphListener != null && !addedJobGraphs.contains(jobId)) {
 								try {
 									// Whoa! This has been added by someone else. Or we were fast
 									// to remove it (false positive).
 									jobGraphListener.onAddedJobGraph(jobId);
-								}
-								catch (Throwable t) {
+								} catch (Throwable t) {
 									LOG.error("Error in callback", t);
 								}
 							}
-						}
-						catch (Exception e) {
+						} catch (Exception e) {
 							LOG.error("Error in SubmittedJobGraphsPathCacheListener", e);
 						}
 					}
+				}
+				break;
 
-					break;
-
-				case CHILD_UPDATED:
+				case CHILD_UPDATED: {
 					// Nothing to do
-					break;
+				}
+				break;
+
+				case CHILD_REMOVED: {
+					JobID jobId = fromEvent(event);
+
+					LOG.debug("Received CHILD_REMOVED event notification for job {}", jobId);
 
-				case CHILD_REMOVED:
 					synchronized (cacheLock) {
 						try {
-							JobID jobId = fromEvent(event);
 							if (jobGraphListener != null && addedJobGraphs.contains(jobId)) {
 								try {
 									// Oh oh. Someone else removed one of our job graphs. Mean!
 									jobGraphListener.onRemovedJobGraph(jobId);
-								}
-								catch (Throwable t) {
+								} catch (Throwable t) {
 									LOG.error("Error in callback", t);
 								}
 							}
 
 							break;
-						}
-						catch (Exception e) {
+						} catch (Exception e) {
 							LOG.error("Error in SubmittedJobGraphsPathCacheListener", e);
 						}
 					}
-					break;
+				}
+				break;
 
-				case CONNECTION_SUSPENDED:
+				case CONNECTION_SUSPENDED: {
 					LOG.warn("ZooKeeper connection SUSPENDED. Changes to the submitted job " +
-							"graphs are not monitored (temporarily).");
-					break;
-				case CONNECTION_LOST:
+						"graphs are not monitored (temporarily).");
+				}
+				break;
+
+				case CONNECTION_LOST: {
 					LOG.warn("ZooKeeper connection LOST. Changes to the submitted job " +
-							"graphs are not monitored (permanently).");
-					break;
+						"graphs are not monitored (permanently).");
+				}
+				break;
 
-				case CONNECTION_RECONNECTED:
+				case CONNECTION_RECONNECTED: {
 					LOG.info("ZooKeeper connection RECONNECTED. Changes to the submitted job " +
-							"graphs are monitored again.");
-					break;
-				case INITIALIZED:
+						"graphs are monitored again.");
+				}
+				break;
+
+				case INITIALIZED: {
 					LOG.info("SubmittedJobGraphsPathCacheListener initialized");
-					break;
+				}
+				break;
 			}
 		}
 


[4/7] flink git commit: [FLINK-5194] [logging] Log heartbeats on TRACE level

Posted by uc...@apache.org.
[FLINK-5194] [logging] Log heartbeats on TRACE level


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8228ac6e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8228ac6e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8228ac6e

Branch: refs/heads/master
Commit: 8228ac6e4d4e047e413ec316fc2ee3f5022b9afd
Parents: 3223a16
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Nov 29 17:15:27 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Dec 1 10:44:23 2016 +0100

----------------------------------------------------------------------
 .../apache/flink/runtime/akka/FlinkUntypedActor.java  | 14 ++++++--------
 .../flink/runtime/instance/InstanceManager.java       |  4 +---
 .../apache/flink/runtime/jobmanager/JobManager.scala  |  2 +-
 3 files changed, 8 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8228ac6e/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
index 5100d17..3255778 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
@@ -19,10 +19,8 @@
 package org.apache.flink.runtime.akka;
 
 import akka.actor.UntypedActor;
-
 import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage;
 import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,7 +38,7 @@ import java.util.UUID;
  * a leader session ID option which is returned by getLeaderSessionID.
  */
 public abstract class FlinkUntypedActor extends UntypedActor {
-	
+
 	protected final Logger LOG = LoggerFactory.getLogger(getClass());
 
 	/**
@@ -56,16 +54,16 @@ public abstract class FlinkUntypedActor extends UntypedActor {
 	 */
 	@Override
 	public final void onReceive(Object message) throws Exception {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("Received message {} at {} from {}.", message, getSelf().path(), getSender());
+		if(LOG.isTraceEnabled()) {
+			LOG.trace("Received message {} at {} from {}.", message, getSelf().path(), getSender());
 
 			long start = System.nanoTime();
 
 			handleLeaderSessionID(message);
 
-			long duration = (System.nanoTime() - start)/ 1000000;
+			long duration = (System.nanoTime() - start)/ 1_000_000;
 
-			LOG.debug("Handled message {} in {} ms from {}.", message, duration, getSender());
+			LOG.trace("Handled message {} in {} ms from {}.", message, duration, getSender());
 		} else {
 			handleLeaderSessionID(message);
 		}
@@ -81,7 +79,7 @@ public abstract class FlinkUntypedActor extends UntypedActor {
 	 * @throws Exception
 	 */
 	private void handleLeaderSessionID(Object message) throws Exception {
-		if(message instanceof LeaderSessionMessage) {
+		if (message instanceof LeaderSessionMessage) {
 			LeaderSessionMessage msg = (LeaderSessionMessage) message;
 			UUID expectedID = getLeaderSessionID();
 			UUID actualID = msg.leaderSessionID();

http://git-wip-us.apache.org/repos/asf/flink/blob/8228ac6e/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
index 65909db..1c2d66f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
@@ -118,9 +118,7 @@ public class InstanceManager {
 
 			host.reportHeartBeat();
 
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Received heartbeat from TaskManager " + host);
-			}
+			LOG.trace("Received heartbeat from TaskManager {}", host);
 
 			return true;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/8228ac6e/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 7cddb2b..0ffca55 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1031,7 +1031,7 @@ class JobManager(
       )
 
     case Heartbeat(instanceID, accumulators) =>
-      log.debug(s"Received heartbeat message from $instanceID.")
+      log.trace(s"Received heartbeat message from $instanceID.")
 
       updateAccumulators(accumulators)
 


[6/7] flink git commit: [FLINK-5207] [logging] Decrease HadoopFileSystem logging

Posted by uc...@apache.org.
[FLINK-5207] [logging] Decrease HadoopFileSystem logging


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bf859e77
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bf859e77
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bf859e77

Branch: refs/heads/master
Commit: bf859e77abb2f1ee14b0bdf18cdb2fe526369203
Parents: f91dd9f
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Nov 29 17:08:53 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Dec 1 10:44:23 2016 +0100

----------------------------------------------------------------------
 .../flink/runtime/fs/hdfs/HadoopFileSystem.java | 53 +++++++++-----------
 1 file changed, 24 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bf859e77/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
index 5d7173b..0eab032 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
@@ -17,25 +17,23 @@
  */
 package org.apache.flink.runtime.fs.hdfs;
 
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.net.URI;
-import java.net.UnknownHostException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.flink.core.fs.HadoopFileSystemWrapper;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.core.fs.BlockLocation;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.HadoopFileSystemWrapper;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.InstantiationUtil;
-
 import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.URI;
+import java.net.UnknownHostException;
 
 /**
  * Concrete implementation of the {@link FileSystem} base class for the Hadoop File System. The
@@ -99,7 +97,7 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
 					fsClass = ((Class<?>) fsHandle).asSubclass(org.apache.hadoop.fs.FileSystem.class);
 
 					if (LOG.isDebugEnabled()) {
-						LOG.debug("Loaded '" + fsClass.getName() + "' as HDFS class.");
+						LOG.debug("Loaded '{}' as HDFS class.", fsClass.getName());
 					}
 				}
 				else {
@@ -114,8 +112,8 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
 		{
 			// first of all, check for a user-defined hdfs class
 			if (LOG.isDebugEnabled()) {
-				LOG.debug("Falling back to loading HDFS class old Hadoop style. Looking for HDFS class configuration entry '"
-						+ HDFS_IMPLEMENTATION_KEY + "'.");
+				LOG.debug("Falling back to loading HDFS class old Hadoop style. Looking for HDFS class configuration entry '{}'.",
+						HDFS_IMPLEMENTATION_KEY);
 			}
 
 			Class<?> classFromConfig = conf.getClass(HDFS_IMPLEMENTATION_KEY, null);
@@ -126,12 +124,12 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
 					fsClass = classFromConfig.asSubclass(org.apache.hadoop.fs.FileSystem.class);
 
 					if (LOG.isDebugEnabled()) {
-						LOG.debug("Loaded HDFS class '" + fsClass.getName() + "' as specified in configuration.");
+						LOG.debug("Loaded HDFS class '{}' as specified in configuration.", fsClass.getName() );
 					}
 				}
 				else {
 					if (LOG.isDebugEnabled()) {
-						LOG.debug("HDFS class specified by " + HDFS_IMPLEMENTATION_KEY + " is of wrong type.");
+						LOG.debug("HDFS class specified by {} is of wrong type.", HDFS_IMPLEMENTATION_KEY);
 					}
 
 					throw new IOException("HDFS class specified by " + HDFS_IMPLEMENTATION_KEY +
@@ -141,7 +139,7 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
 			else {
 				// load the default HDFS class
 				if (LOG.isDebugEnabled()) {
-					LOG.debug("Trying to load default HDFS implementation " + DEFAULT_HDFS_CLASS);
+					LOG.debug("Trying to load default HDFS implementation {}.", DEFAULT_HDFS_CLASS);
 				}
 
 				try {
@@ -190,14 +188,14 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
 		if (hdfsDefaultPath != null) {
 			retConf.addResource(new org.apache.hadoop.fs.Path(hdfsDefaultPath));
 		} else {
-			LOG.debug("Cannot find hdfs-default configuration file");
+			LOG.trace("{} configuration key for hdfs-default configuration file not set", ConfigConstants.HDFS_DEFAULT_CONFIG);
 		}
 
 		final String hdfsSitePath = flinkConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null);
 		if (hdfsSitePath != null) {
 			retConf.addResource(new org.apache.hadoop.fs.Path(hdfsSitePath));
 		} else {
-			LOG.debug("Cannot find hdfs-site configuration file");
+			LOG.trace("{} configuration key for hdfs-site configuration file not set", ConfigConstants.HDFS_SITE_CONFIG);
 		}
 		
 		// 2. Approach environment variables
@@ -215,17 +213,14 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
 				if (new File(possibleHadoopConfPath).exists()) {
 					if (new File(possibleHadoopConfPath + "/core-site.xml").exists()) {
 						retConf.addResource(new org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/core-site.xml"));
-
-						if (LOG.isDebugEnabled()) {
-							LOG.debug("Adding " + possibleHadoopConfPath + "/core-site.xml to hadoop configuration");
-						}
+					} else {
+						LOG.debug("File {}/core-site.xml not found.", possibleHadoopConfPath);
 					}
+
 					if (new File(possibleHadoopConfPath + "/hdfs-site.xml").exists()) {
 						retConf.addResource(new org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/hdfs-site.xml"));
-
-						if (LOG.isDebugEnabled()) {
-							LOG.debug("Adding " + possibleHadoopConfPath + "/hdfs-site.xml to hadoop configuration");
-						}
+					} else {
+						LOG.debug("File {}/hdfs-site.xml not found.", possibleHadoopConfPath);
 					}
 				}
 			}
@@ -289,7 +284,7 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
 			}
 			
 			if (LOG.isDebugEnabled()) {
-				LOG.debug("fs.defaultFS is set to " + configEntry);
+				LOG.debug("fs.defaultFS is set to {}", configEntry);
 			}
 			
 			if (configEntry == null) {
@@ -464,7 +459,7 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
 		clazz = hadoopConf.getClass("fs." + scheme + ".impl", null, org.apache.hadoop.fs.FileSystem.class);
 
 		if(clazz != null && LOG.isDebugEnabled()) {
-			LOG.debug("Flink supports "+scheme+" with the Hadoop file system wrapper, impl "+clazz);
+			LOG.debug("Flink supports {} with the Hadoop file system wrapper, impl {}", scheme, clazz);
 		}
 		return clazz;
 	}


[5/7] flink git commit: [FLINK-5196] [logging] Don't log InputChannelDeploymentDescriptor

Posted by uc...@apache.org.
[FLINK-5196] [logging] Don't log InputChannelDeploymentDescriptor


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/67bd8277
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/67bd8277
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/67bd8277

Branch: refs/heads/master
Commit: 67bd8277d1dc1179c30d2dbad0922122ed6f49ee
Parents: dc5650a
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Nov 29 16:04:48 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Dec 1 10:44:23 2016 +0100

----------------------------------------------------------------------
 .../InputChannelDeploymentDescriptor.java         |  3 ---
 .../partition/consumer/SingleInputGate.java       | 18 +++++++++++++++---
 2 files changed, 15 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/67bd8277/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
index a72b92f..9b3ce5f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
@@ -35,7 +35,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
-import java.util.Arrays;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -143,8 +142,6 @@ public class InputChannelDeploymentDescriptor implements Serializable {
 					consumedPartitionId, partitionLocation);
 		}
 
-		LOG.debug("Created {} from edges {}.", Arrays.toString(icdd), Arrays.toString(edges));
-
 		return icdd;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/67bd8277/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 8f57542..d7ed33c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -45,7 +45,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.BitSet;
 import java.util.List;
 import java.util.Map;
@@ -554,8 +553,11 @@ public class SingleInputGate implements InputGate {
 		// Create the input channels. There is one input channel for each consumed partition.
 		final InputChannel[] inputChannels = new InputChannel[icdd.length];
 
-		for (int i = 0; i < inputChannels.length; i++) {
+		int numLocalChannels = 0;
+		int numRemoteChannels = 0;
+		int numUnknownChannels = 0;
 
+		for (int i = 0; i < inputChannels.length; i++) {
 			final ResultPartitionID partitionId = icdd[i].getConsumedPartitionId();
 			final ResultPartitionLocation partitionLocation = icdd[i].getConsumedPartitionLocation();
 
@@ -567,6 +569,8 @@ public class SingleInputGate implements InputGate {
 					networkEnvironment.getPartitionRequestMaxBackoff(),
 					metrics
 				);
+
+				numLocalChannels++;
 			}
 			else if (partitionLocation.isRemote()) {
 				inputChannels[i] = new RemoteInputChannel(inputGate, i, partitionId,
@@ -576,6 +580,8 @@ public class SingleInputGate implements InputGate {
 					networkEnvironment.getPartitionRequestMaxBackoff(),
 					metrics
 				);
+
+				numRemoteChannels++;
 			}
 			else if (partitionLocation.isUnknown()) {
 				inputChannels[i] = new UnknownInputChannel(inputGate, i, partitionId,
@@ -586,6 +592,8 @@ public class SingleInputGate implements InputGate {
 					networkEnvironment.getPartitionRequestMaxBackoff(),
 					metrics
 				);
+
+				numUnknownChannels++;
 			}
 			else {
 				throw new IllegalStateException("Unexpected partition location.");
@@ -594,7 +602,11 @@ public class SingleInputGate implements InputGate {
 			inputGate.setInputChannel(partitionId.getPartitionId(), inputChannels[i]);
 		}
 
-		LOG.debug("Created input channels {} from {}.", Arrays.toString(inputChannels), igdd);
+		LOG.debug("Created {} input channels (local: {}, remote: {}, unknown: {}).",
+			inputChannels.length,
+			numLocalChannels,
+			numRemoteChannels,
+			numUnknownChannels);
 
 		return inputGate;
 	}


[7/7] flink git commit: [FLINK-5198] [logging] Improve TaskState toString

Posted by uc...@apache.org.
[FLINK-5198] [logging] Improve TaskState toString


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dc7d8ec2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dc7d8ec2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dc7d8ec2

Branch: refs/heads/master
Commit: dc7d8ec2c4d03c42e3d582947a3fe39a274d7f4b
Parents: 67bd827
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Nov 29 16:15:30 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Dec 1 10:44:23 2016 +0100

----------------------------------------------------------------------
 .../runtime/checkpoint/CheckpointCoordinator.java    |  8 ++++++--
 .../apache/flink/runtime/checkpoint/TaskState.java   | 15 +++++++++++++--
 2 files changed, 19 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dc7d8ec2/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 638e0a7..2242c14 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -653,9 +653,13 @@ public class CheckpointCoordinator {
 
 							if (LOG.isDebugEnabled()) {
 								StringBuilder builder = new StringBuilder();
-								for (Map.Entry<JobVertexID, TaskState> entry : completed.getTaskStates().entrySet()) {
-									builder.append("JobVertexID: ").append(entry.getKey()).append(" {").append(entry.getValue()).append("}");
+								builder.append("Checkpoint state: ");
+								for (TaskState state : completed.getTaskStates().values()) {
+									builder.append(state);
+									builder.append(", ");
 								}
+								// Remove last two chars ", "
+								builder.delete(builder.length() - 2, builder.length());
 
 								LOG.debug(builder.toString());
 							}

http://git-wip-us.apache.org/repos/asf/flink/blob/dc7d8ec2/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
index 3cdc5e9..76f1c51 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
@@ -23,7 +23,6 @@ import org.apache.flink.runtime.state.StateObject;
 import org.apache.flink.runtime.state.StateUtil;
 import org.apache.flink.util.Preconditions;
 
-import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -130,7 +129,7 @@ public class TaskState implements StateObject {
 
 
 	@Override
-	public long getStateSize() throws IOException {
+	public long getStateSize() {
 		long result = 0L;
 
 		for (int i = 0; i < parallelism; i++) {
@@ -164,4 +163,16 @@ public class TaskState implements StateObject {
 	public Map<Integer, SubtaskState> getSubtaskStates() {
 		return Collections.unmodifiableMap(subtaskStates);
 	}
+
+	@Override
+	public String toString() {
+		// KvStates are always null in 1.1. Don't print this as it might
+		// confuse users that don't care about how we store it internally.
+		return "TaskState(" +
+			"jobVertexID: " + jobVertexID +
+			", parallelism: " + parallelism +
+			", sub task states: " + subtaskStates.size() +
+			", total size (bytes): " + getStateSize() +
+			')';
+	}
 }


[3/7] flink git commit: [FLINK-5201] [logging] Log loaded config properties on INFO level

Posted by uc...@apache.org.
[FLINK-5201] [logging] Log loaded config properties on INFO level


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dc5650a3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dc5650a3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dc5650a3

Branch: refs/heads/master
Commit: dc5650a3ea15b950aa906e456960e1cf7c94d3a0
Parents: 8228ac6
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Nov 29 16:00:02 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Dec 1 10:44:23 2016 +0100

----------------------------------------------------------------------
 .../java/org/apache/flink/configuration/GlobalConfiguration.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dc5650a3/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
index 8d550d7..ecfbc72 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
@@ -144,7 +144,7 @@ public final class GlobalConfiguration {
 						continue;
 					}
 
-					LOG.debug("Loading configuration property: {}, {}", key, value);
+					LOG.info("Loading configuration property: {}, {}", key, value);
 					config.setString(key, value);
 				}
 			}