You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/11/30 15:22:22 UTC

[1/7] flink git commit: [FLINK-5192] [logging] Improve log config templates

Repository: flink
Updated Branches:
  refs/heads/release-1.1 569a9666f -> 357b932b8


[FLINK-5192] [logging] Improve log config templates

This closes #2899.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/357b932b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/357b932b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/357b932b

Branch: refs/heads/release-1.1
Commit: 357b932b822d54f1e3913706aa8d4141e9461197
Parents: 28e44e7
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Nov 29 17:14:23 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Wed Nov 30 16:22:11 2016 +0100

----------------------------------------------------------------------
 .../src/main/flink-bin/conf/log4j.properties    | 14 +++++++++-
 flink-dist/src/main/flink-bin/conf/logback.xml  | 28 +++++++++++++++++++-
 2 files changed, 40 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/357b932b/flink-dist/src/main/flink-bin/conf/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/conf/log4j.properties b/flink-dist/src/main/flink-bin/conf/log4j.properties
index 97ec653..8e00ce3 100644
--- a/flink-dist/src/main/flink-bin/conf/log4j.properties
+++ b/flink-dist/src/main/flink-bin/conf/log4j.properties
@@ -16,8 +16,20 @@
 # limitations under the License.
 ################################################################################
 
+# This affects logging for both user code and Flink
 log4j.rootLogger=INFO, file
 
+# Uncomment this if you want to _only_ change Flink's logging
+#log4j.logger.org.apache.flink=INFO
+
+# The following lines keep the log level of common libraries/connectors on
+# log level INFO. The root logger does not override this. You have to manually
+# change the log levels here.
+log4j.logger.akka=INFO
+log4j.logger.org.apache.kafka=INFO
+log4j.logger.org.apache.hadoop=INFO
+log4j.logger.org.apache.zookeeper=INFO
+
 # Log all infos in the given file
 log4j.appender.file=org.apache.log4j.FileAppender
 log4j.appender.file.file=${log.file}
@@ -25,5 +37,5 @@ log4j.appender.file.append=false
 log4j.appender.file.layout=org.apache.log4j.PatternLayout
 log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
 
-# suppress the irrelevant (wrong) warnings from the netty channel handler
+# Suppress the irrelevant (wrong) warnings from the Netty channel handler
 log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file

http://git-wip-us.apache.org/repos/asf/flink/blob/357b932b/flink-dist/src/main/flink-bin/conf/logback.xml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/conf/logback.xml b/flink-dist/src/main/flink-bin/conf/logback.xml
index 1147a70..f3c4331 100644
--- a/flink-dist/src/main/flink-bin/conf/logback.xml
+++ b/flink-dist/src/main/flink-bin/conf/logback.xml
@@ -25,8 +25,34 @@
         </encoder>
     </appender>
 
+    <!-- This affects logging for both user code and Flink -->
     <root level="INFO">
         <appender-ref ref="file"/>
     </root>
-</configuration>
 
+    <!-- Uncomment this if you want to only change Flink's logging -->
+    <!--<logger name="org.apache.flink" level="INFO">-->
+        <!--<appender-ref ref="file"/>-->
+    <!--</logger>-->
+
+    <!-- The following lines keep the log level of common libraries/connectors on
+         log level INFO. The root logger does not override this. You have to manually
+         change the log levels here. -->
+    <logger name="akka" level="INFO">
+        <appender-ref ref="file"/>
+    </logger>
+    <logger name="org.apache.kafka" level="INFO">
+        <appender-ref ref="file"/>
+    </logger>
+    <logger name="org.apache.hadoop" level="INFO">
+        <appender-ref ref="file"/>
+    </logger>
+    <logger name="org.apache.zookeeper" level="INFO">
+        <appender-ref ref="file"/>
+    </logger>
+
+    <!-- Suppress the irrelevant (wrong) warnings from the Netty channel handler -->
+    <logger name="org.jboss.netty.channel.DefaultChannelPipeline" level="ERROR">
+        <appender-ref ref="file"/>
+    </logger>
+</configuration>


[6/7] flink git commit: [FLINK-5207] [logging] Decrease HadoopFileSystem logging

Posted by uc...@apache.org.
[FLINK-5207] [logging] Decrease HadoopFileSystem logging


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/28e44e7b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/28e44e7b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/28e44e7b

Branch: refs/heads/release-1.1
Commit: 28e44e7bf0fd3aa5c55c74a1c39c22cd77935494
Parents: 8d949c9
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Nov 29 17:08:53 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Wed Nov 30 16:22:11 2016 +0100

----------------------------------------------------------------------
 .../flink/runtime/fs/hdfs/HadoopFileSystem.java | 53 +++++++++-----------
 1 file changed, 24 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/28e44e7b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
index 4e05ebe..f747112 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
@@ -17,25 +17,23 @@
  */
 package org.apache.flink.runtime.fs.hdfs;
 
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.net.URI;
-import java.net.UnknownHostException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.flink.core.fs.HadoopFileSystemWrapper;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.core.fs.BlockLocation;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.HadoopFileSystemWrapper;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.InstantiationUtil;
-
 import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.URI;
+import java.net.UnknownHostException;
 
 /**
  * Concrete implementation of the {@link FileSystem} base class for the Hadoop File System. The
@@ -99,7 +97,7 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
 					fsClass = ((Class<?>) fsHandle).asSubclass(org.apache.hadoop.fs.FileSystem.class);
 
 					if (LOG.isDebugEnabled()) {
-						LOG.debug("Loaded '" + fsClass.getName() + "' as HDFS class.");
+						LOG.debug("Loaded '{}' as HDFS class.", fsClass.getName());
 					}
 				}
 				else {
@@ -114,8 +112,8 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
 		{
 			// first of all, check for a user-defined hdfs class
 			if (LOG.isDebugEnabled()) {
-				LOG.debug("Falling back to loading HDFS class old Hadoop style. Looking for HDFS class configuration entry '"
-						+ HDFS_IMPLEMENTATION_KEY + "'.");
+				LOG.debug("Falling back to loading HDFS class old Hadoop style. Looking for HDFS class configuration entry '{}'.",
+						HDFS_IMPLEMENTATION_KEY);
 			}
 
 			Class<?> classFromConfig = conf.getClass(HDFS_IMPLEMENTATION_KEY, null);
@@ -126,12 +124,12 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
 					fsClass = classFromConfig.asSubclass(org.apache.hadoop.fs.FileSystem.class);
 
 					if (LOG.isDebugEnabled()) {
-						LOG.debug("Loaded HDFS class '" + fsClass.getName() + "' as specified in configuration.");
+						LOG.debug("Loaded HDFS class '{}' as specified in configuration.", fsClass.getName() );
 					}
 				}
 				else {
 					if (LOG.isDebugEnabled()) {
-						LOG.debug("HDFS class specified by " + HDFS_IMPLEMENTATION_KEY + " is of wrong type.");
+						LOG.debug("HDFS class specified by {} is of wrong type.", HDFS_IMPLEMENTATION_KEY);
 					}
 
 					throw new IOException("HDFS class specified by " + HDFS_IMPLEMENTATION_KEY +
@@ -141,7 +139,7 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
 			else {
 				// load the default HDFS class
 				if (LOG.isDebugEnabled()) {
-					LOG.debug("Trying to load default HDFS implementation " + DEFAULT_HDFS_CLASS);
+					LOG.debug("Trying to load default HDFS implementation {}.", DEFAULT_HDFS_CLASS);
 				}
 
 				try {
@@ -186,14 +184,14 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
 		if (hdfsDefaultPath != null) {
 			retConf.addResource(new org.apache.hadoop.fs.Path(hdfsDefaultPath));
 		} else {
-			LOG.debug("Cannot find hdfs-default configuration file");
+			LOG.trace("{} configuration key for hdfs-default configuration file not set", ConfigConstants.HDFS_DEFAULT_CONFIG);
 		}
 
 		final String hdfsSitePath = GlobalConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null);
 		if (hdfsSitePath != null) {
 			retConf.addResource(new org.apache.hadoop.fs.Path(hdfsSitePath));
 		} else {
-			LOG.debug("Cannot find hdfs-site configuration file");
+			LOG.trace("{} configuration key for hdfs-site configuration file not set", ConfigConstants.HDFS_SITE_CONFIG);
 		}
 		
 		// 2. Approach environment variables
@@ -211,17 +209,14 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
 				if (new File(possibleHadoopConfPath).exists()) {
 					if (new File(possibleHadoopConfPath + "/core-site.xml").exists()) {
 						retConf.addResource(new org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/core-site.xml"));
-
-						if (LOG.isDebugEnabled()) {
-							LOG.debug("Adding " + possibleHadoopConfPath + "/core-site.xml to hadoop configuration");
-						}
+					} else {
+						LOG.debug("File {}/core-site.xml not found.", possibleHadoopConfPath);
 					}
+
 					if (new File(possibleHadoopConfPath + "/hdfs-site.xml").exists()) {
 						retConf.addResource(new org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/hdfs-site.xml"));
-
-						if (LOG.isDebugEnabled()) {
-							LOG.debug("Adding " + possibleHadoopConfPath + "/hdfs-site.xml to hadoop configuration");
-						}
+					} else {
+						LOG.debug("File {}/hdfs-site.xml not found.", possibleHadoopConfPath);
 					}
 				}
 			}
@@ -285,7 +280,7 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
 			}
 			
 			if (LOG.isDebugEnabled()) {
-				LOG.debug("fs.defaultFS is set to " + configEntry);
+				LOG.debug("fs.defaultFS is set to {}", configEntry);
 			}
 			
 			if (configEntry == null) {
@@ -460,7 +455,7 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
 		clazz = hadoopConf.getClass("fs." + scheme + ".impl", null, org.apache.hadoop.fs.FileSystem.class);
 
 		if(clazz != null && LOG.isDebugEnabled()) {
-			LOG.debug("Flink supports "+scheme+" with the Hadoop file system wrapper, impl "+clazz);
+			LOG.debug("Flink supports {} with the Hadoop file system wrapper, impl {}", scheme, clazz);
 		}
 		return clazz;
 	}


[2/7] flink git commit: [FLINK-5199] [logging] Improve logging in ZooKeeperSubmittedJobGraphStore

Posted by uc...@apache.org.
[FLINK-5199] [logging] Improve logging in ZooKeeperSubmittedJobGraphStore


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8d949c96
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8d949c96
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8d949c96

Branch: refs/heads/release-1.1
Commit: 8d949c966b8916213e9b57996aaba0fe7c0e13fa
Parents: ee478fe
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Nov 29 16:35:14 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Wed Nov 30 16:22:11 2016 +0100

----------------------------------------------------------------------
 .../runtime/jobmanager/SubmittedJobGraph.java   |  2 +-
 .../ZooKeeperSubmittedJobGraphStore.java        | 87 +++++++++++++-------
 2 files changed, 57 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8d949c96/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java
index faacc93..e868da7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java
@@ -72,6 +72,6 @@ public class SubmittedJobGraph implements Serializable {
 
 	@Override
 	public String toString() {
-		return String.format("SubmittedJobGraph(%s, %s)", jobGraph, jobInfo);
+		return String.format("SubmittedJobGraph(%s, %s)", jobGraph.getJobID(), jobInfo);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8d949c96/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
index a1dd14b..7324c07 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
@@ -82,6 +82,9 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 	 */
 	private final PathChildrenCache pathCache;
 
+	/** The full configured base path including the namespace. */
+	private final String zooKeeperFullBasePath;
+
 	/** The external listener to be notified on races. */
 	private SubmittedJobGraphListener jobGraphListener;
 
@@ -117,6 +120,7 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 		// All operations will have the path as root
 		CuratorFramework facade = client.usingNamespace(client.getNamespace() + currentJobsPath);
 
+		this.zooKeeperFullBasePath = client.getNamespace() + currentJobsPath;
 		this.jobGraphsInZooKeeper = new ZooKeeperStateHandleStore<>(facade, stateStorage, executor);
 
 		this.pathCache = new PathChildrenCache(facade, "/", false);
@@ -156,6 +160,8 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 		synchronized (cacheLock) {
 			verifyIsRunning();
 
+			LOG.debug("Recovering all job graphs from ZooKeeper at {}.", zooKeeperFullBasePath);
+
 			List<Tuple2<StateHandle<SubmittedJobGraph>, String>> submitted;
 
 			while (true) {
@@ -168,6 +174,8 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 				}
 			}
 
+			LOG.info("Found {} job graphs.", submitted.size());
+
 			if (submitted.size() != 0) {
 				List<SubmittedJobGraph> jobGraphs = new ArrayList<>(submitted.size());
 
@@ -195,6 +203,8 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 		checkNotNull(jobId, "Job ID");
 		String path = getPathForJob(jobId);
 
+		LOG.debug("Recovering job graph {} from {}{}.", jobId, zooKeeperFullBasePath, path);
+
 		synchronized (cacheLock) {
 			verifyIsRunning();
 
@@ -221,6 +231,8 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 		checkNotNull(jobGraph, "Job graph");
 		String path = getPathForJob(jobGraph.getJobId());
 
+		LOG.debug("Adding job graph {} to {}{}.", jobGraph.getJobId(), zooKeeperFullBasePath, path);
+
 		boolean success = false;
 
 		while (!success) {
@@ -235,8 +247,6 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 
 						addedJobGraphs.add(jobGraph.getJobId());
 
-						LOG.info("Added {} to ZooKeeper.", jobGraph);
-
 						success = true;
 					}
 					catch (KeeperException.NodeExistsException ignored) {
@@ -258,6 +268,8 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 				}
 			}
 		}
+
+		LOG.info("Added {} to ZooKeeper.", jobGraph);
 	}
 
 	@Override
@@ -265,14 +277,17 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 		checkNotNull(jobId, "Job ID");
 		String path = getPathForJob(jobId);
 
+		LOG.debug("Removing job graph {} from {}{}.", jobId, zooKeeperFullBasePath, path);
+
 		synchronized (cacheLock) {
 			if (addedJobGraphs.contains(jobId)) {
 				jobGraphsInZooKeeper.removeAndDiscardState(path);
 
 				addedJobGraphs.remove(jobId);
-				LOG.info("Removed job graph {} from ZooKeeper.", jobId);
 			}
 		}
+
+		LOG.info("Removed job graph {} from ZooKeeper.", jobId);
 	}
 
 	/**
@@ -297,70 +312,80 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 			}
 
 			switch (event.getType()) {
-				case CHILD_ADDED:
+				case CHILD_ADDED: {
+					JobID jobId = fromEvent(event);
+
+					LOG.debug("Received CHILD_ADDED event notification for job {}", jobId);
+
 					synchronized (cacheLock) {
 						try {
-							JobID jobId = fromEvent(event);
 							if (jobGraphListener != null && !addedJobGraphs.contains(jobId)) {
 								try {
 									// Whoa! This has been added by someone else. Or we were fast
 									// to remove it (false positive).
 									jobGraphListener.onAddedJobGraph(jobId);
-								}
-								catch (Throwable t) {
+								} catch (Throwable t) {
 									LOG.error("Error in callback", t);
 								}
 							}
-						}
-						catch (Exception e) {
+						} catch (Exception e) {
 							LOG.error("Error in SubmittedJobGraphsPathCacheListener", e);
 						}
 					}
+				}
+				break;
 
-					break;
-
-				case CHILD_UPDATED:
+				case CHILD_UPDATED: {
 					// Nothing to do
-					break;
+				}
+				break;
+
+				case CHILD_REMOVED: {
+					JobID jobId = fromEvent(event);
+
+					LOG.debug("Received CHILD_REMOVED event notification for job {}", jobId);
 
-				case CHILD_REMOVED:
 					synchronized (cacheLock) {
 						try {
-							JobID jobId = fromEvent(event);
 							if (jobGraphListener != null && addedJobGraphs.contains(jobId)) {
 								try {
 									// Oh oh. Someone else removed one of our job graphs. Mean!
 									jobGraphListener.onRemovedJobGraph(jobId);
-								}
-								catch (Throwable t) {
+								} catch (Throwable t) {
 									LOG.error("Error in callback", t);
 								}
 							}
 
 							break;
-						}
-						catch (Exception e) {
+						} catch (Exception e) {
 							LOG.error("Error in SubmittedJobGraphsPathCacheListener", e);
 						}
 					}
-					break;
+				}
+				break;
 
-				case CONNECTION_SUSPENDED:
+				case CONNECTION_SUSPENDED: {
 					LOG.warn("ZooKeeper connection SUSPENDED. Changes to the submitted job " +
-							"graphs are not monitored (temporarily).");
-					break;
-				case CONNECTION_LOST:
+						"graphs are not monitored (temporarily).");
+				}
+				break;
+
+				case CONNECTION_LOST: {
 					LOG.warn("ZooKeeper connection LOST. Changes to the submitted job " +
-							"graphs are not monitored (permanently).");
-					break;
+						"graphs are not monitored (permanently).");
+				}
+				break;
 
-				case CONNECTION_RECONNECTED:
+				case CONNECTION_RECONNECTED: {
 					LOG.info("ZooKeeper connection RECONNECTED. Changes to the submitted job " +
-							"graphs are monitored again.");
-					break;
-				case INITIALIZED:
+						"graphs are monitored again.");
+				}
+				break;
+
+				case INITIALIZED: {
 					LOG.info("SubmittedJobGraphsPathCacheListener initialized");
-					break;
+				}
+				break;
 			}
 		}
 


[7/7] flink git commit: [FLINK-5198] [logging] Improve TaskState toString

Posted by uc...@apache.org.
[FLINK-5198] [logging] Improve TaskState toString


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ee478fe2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ee478fe2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ee478fe2

Branch: refs/heads/release-1.1
Commit: ee478fe278d5893048f015c2c1cc6dbfc7c68d8b
Parents: 7b9a444
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Nov 29 16:15:30 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Wed Nov 30 16:22:11 2016 +0100

----------------------------------------------------------------------
 .../flink/runtime/checkpoint/CheckpointCoordinator.java |  8 ++++++--
 .../org/apache/flink/runtime/checkpoint/TaskState.java  | 12 ++++++++++++
 2 files changed, 18 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ee478fe2/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 0cf944c..24cc3cb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -693,9 +693,13 @@ public class CheckpointCoordinator {
 
 							if (LOG.isDebugEnabled()) {
 								StringBuilder builder = new StringBuilder();
-								for (Map.Entry<JobVertexID, TaskState> entry : completed.getTaskStates().entrySet()) {
-									builder.append("JobVertexID: ").append(entry.getKey()).append(" {").append(entry.getValue()).append("}");
+								builder.append("Checkpoint state: ");
+								for (TaskState state : completed.getTaskStates().values()) {
+									builder.append(state);
+									builder.append(", ");
 								}
+								// Remove last two chars ", "
+								builder.delete(builder.length() - 2, builder.length());
 
 								LOG.debug(builder.toString());
 							}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee478fe2/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
index ac4503d..14f8caa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
@@ -168,4 +168,16 @@ public class TaskState implements Serializable {
 	public int hashCode() {
 		return parallelism + 31 * Objects.hash(jobVertexID, subtaskStates, kvStates);
 	}
+
+	@Override
+	public String toString() {
+		// KvStates are always null in 1.1. Don't print this as it might
+		// confuse users that don't care about how we store it internally.
+		return "TaskState(" +
+			"jobVertexID: " + jobVertexID +
+			", parallelism: " + parallelism +
+			", sub task states: " + subtaskStates.size() +
+			", total size (bytes): " + getStateSize() +
+			')';
+	}
 }


[3/7] flink git commit: [FLINK-5201] [logging] Log loaded config properties on INFO level

Posted by uc...@apache.org.
[FLINK-5201] [logging] Log loaded config properties on INFO level


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c8ade638
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c8ade638
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c8ade638

Branch: refs/heads/release-1.1
Commit: c8ade638b79fcf6816c386c1070c81b83bd18916
Parents: ffe6b6b
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Nov 29 16:00:02 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Wed Nov 30 16:22:11 2016 +0100

----------------------------------------------------------------------
 .../java/org/apache/flink/configuration/GlobalConfiguration.java | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c8ade638/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
index 7e50486..14a6ae8 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
@@ -253,7 +253,7 @@ public final class GlobalConfiguration {
 							continue;
 						}
 	
-						LOG.debug("Loading configuration property: {}, {}", key, value);
+						LOG.info("Loading configuration property: {}, {}", key, value);
 	
 						this.config.setString(key, value);
 					}
@@ -372,7 +372,7 @@ public final class GlobalConfiguration {
 
 					if (key != null && value != null) {
 						// Put key, value pair into the map
-						LOG.debug("Loading configuration property: {}, {}", key, value);
+						LOG.info("Loading configuration property: {}, {}", key, value);
 						this.config.setString(key, value);
 					} else {
 						LOG.warn("Error while reading configuration: Cannot read property " + propNumber);


[4/7] flink git commit: [FLINK-5194] [logging] Log heartbeats on TRACE level

Posted by uc...@apache.org.
[FLINK-5194] [logging] Log heartbeats on TRACE level


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ffe6b6b5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ffe6b6b5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ffe6b6b5

Branch: refs/heads/release-1.1
Commit: ffe6b6b595cf80d4682e273803930f62139af9c0
Parents: 569a966
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Nov 29 17:15:27 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Wed Nov 30 16:22:11 2016 +0100

----------------------------------------------------------------------
 .../apache/flink/runtime/akka/FlinkUntypedActor.java  | 14 ++++++--------
 .../flink/runtime/instance/InstanceManager.java       |  4 +---
 .../apache/flink/runtime/jobmanager/JobManager.scala  |  2 +-
 3 files changed, 8 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ffe6b6b5/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
index 5100d17..3255778 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
@@ -19,10 +19,8 @@
 package org.apache.flink.runtime.akka;
 
 import akka.actor.UntypedActor;
-
 import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage;
 import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,7 +38,7 @@ import java.util.UUID;
  * a leader session ID option which is returned by getLeaderSessionID.
  */
 public abstract class FlinkUntypedActor extends UntypedActor {
-	
+
 	protected final Logger LOG = LoggerFactory.getLogger(getClass());
 
 	/**
@@ -56,16 +54,16 @@ public abstract class FlinkUntypedActor extends UntypedActor {
 	 */
 	@Override
 	public final void onReceive(Object message) throws Exception {
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("Received message {} at {} from {}.", message, getSelf().path(), getSender());
+		if(LOG.isTraceEnabled()) {
+			LOG.trace("Received message {} at {} from {}.", message, getSelf().path(), getSender());
 
 			long start = System.nanoTime();
 
 			handleLeaderSessionID(message);
 
-			long duration = (System.nanoTime() - start)/ 1000000;
+			long duration = (System.nanoTime() - start)/ 1_000_000;
 
-			LOG.debug("Handled message {} in {} ms from {}.", message, duration, getSender());
+			LOG.trace("Handled message {} in {} ms from {}.", message, duration, getSender());
 		} else {
 			handleLeaderSessionID(message);
 		}
@@ -81,7 +79,7 @@ public abstract class FlinkUntypedActor extends UntypedActor {
 	 * @throws Exception
 	 */
 	private void handleLeaderSessionID(Object message) throws Exception {
-		if(message instanceof LeaderSessionMessage) {
+		if (message instanceof LeaderSessionMessage) {
 			LeaderSessionMessage msg = (LeaderSessionMessage) message;
 			UUID expectedID = getLeaderSessionID();
 			UUID actualID = msg.leaderSessionID();

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe6b6b5/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
index 0d0d4c7..3fe92a5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
@@ -124,9 +124,7 @@ public class InstanceManager {
 			host.reportHeartBeat();
 			host.setMetricsReport(lastMetricsReport);
 
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Received heartbeat from TaskManager " + host);
-			}
+			LOG.trace("Received heartbeat from TaskManager {}", host);
 
 			return true;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe6b6b5/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index cf60d4e..9061db4 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -927,7 +927,7 @@ class JobManager(
       )
 
     case Heartbeat(instanceID, metricsReport, accumulators) =>
-      log.debug(s"Received heartbeat message from $instanceID.")
+      log.trace(s"Received heartbeat message from $instanceID.")
 
       updateAccumulators(accumulators)
 


[5/7] flink git commit: [FLINK-5196] [logging] Don't log InputChannelDeploymentDescriptor

Posted by uc...@apache.org.
[FLINK-5196] [logging] Don't log InputChannelDeploymentDescriptor


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7b9a4445
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7b9a4445
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7b9a4445

Branch: refs/heads/release-1.1
Commit: 7b9a4445981ac8993af7a53cf057666e78c92140
Parents: c8ade63
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Nov 29 16:04:48 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Wed Nov 30 16:22:11 2016 +0100

----------------------------------------------------------------------
 .../InputChannelDeploymentDescriptor.java         |  3 ---
 .../partition/consumer/SingleInputGate.java       | 18 +++++++++++++++---
 2 files changed, 15 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7b9a4445/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
index 6b87e69..24b95ea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
@@ -32,7 +32,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
-import java.util.Arrays;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -135,8 +134,6 @@ public class InputChannelDeploymentDescriptor implements Serializable {
 					consumedPartitionId, partitionLocation);
 		}
 
-		LOG.debug("Created {} from edges {}.", Arrays.toString(icdd), Arrays.toString(edges));
-
 		return icdd;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7b9a4445/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 8f44fbc..1550b0d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -46,7 +46,6 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.BitSet;
 import java.util.List;
 import java.util.Map;
@@ -573,8 +572,11 @@ public class SingleInputGate implements InputGate {
 		// Create the input channels. There is one input channel for each consumed partition.
 		final InputChannel[] inputChannels = new InputChannel[icdd.length];
 
-		for (int i = 0; i < inputChannels.length; i++) {
+		int numLocalChannels = 0;
+		int numRemoteChannels = 0;
+		int numUnknownChannels = 0;
 
+		for (int i = 0; i < inputChannels.length; i++) {
 			final ResultPartitionID partitionId = icdd[i].getConsumedPartitionId();
 			final ResultPartitionLocation partitionLocation = icdd[i].getConsumedPartitionLocation();
 
@@ -585,6 +587,8 @@ public class SingleInputGate implements InputGate {
 						networkEnvironment.getPartitionRequestInitialAndMaxBackoff(),
 						metrics
 				);
+
+				numLocalChannels++;
 			}
 			else if (partitionLocation.isRemote()) {
 				inputChannels[i] = new RemoteInputChannel(inputGate, i, partitionId,
@@ -593,6 +597,8 @@ public class SingleInputGate implements InputGate {
 						networkEnvironment.getPartitionRequestInitialAndMaxBackoff(),
 						metrics
 				);
+
+				numRemoteChannels++;
 			}
 			else if (partitionLocation.isUnknown()) {
 				inputChannels[i] = new UnknownInputChannel(inputGate, i, partitionId,
@@ -602,6 +608,8 @@ public class SingleInputGate implements InputGate {
 						networkEnvironment.getPartitionRequestInitialAndMaxBackoff(),
 						metrics
 				);
+
+				numUnknownChannels++;
 			}
 			else {
 				throw new IllegalStateException("Unexpected partition location.");
@@ -610,7 +618,11 @@ public class SingleInputGate implements InputGate {
 			inputGate.setInputChannel(partitionId.getPartitionId(), inputChannels[i]);
 		}
 
-		LOG.debug("Created input channels {} from {}.", Arrays.toString(inputChannels), igdd);
+		LOG.debug("Created {} input channels (local: {}, remote: {}, unknown: {}).",
+			inputChannels.length,
+			numLocalChannels,
+			numRemoteChannels,
+			numUnknownChannels);
 
 		return inputGate;
 	}