You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/11/30 15:22:22 UTC
[1/7] flink git commit: [FLINK-5192] [logging] Improve log config
templates
Repository: flink
Updated Branches:
refs/heads/release-1.1 569a9666f -> 357b932b8
[FLINK-5192] [logging] Improve log config templates
This closes #2899.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/357b932b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/357b932b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/357b932b
Branch: refs/heads/release-1.1
Commit: 357b932b822d54f1e3913706aa8d4141e9461197
Parents: 28e44e7
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Nov 29 17:14:23 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Wed Nov 30 16:22:11 2016 +0100
----------------------------------------------------------------------
.../src/main/flink-bin/conf/log4j.properties | 14 +++++++++-
flink-dist/src/main/flink-bin/conf/logback.xml | 28 +++++++++++++++++++-
2 files changed, 40 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/357b932b/flink-dist/src/main/flink-bin/conf/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/conf/log4j.properties b/flink-dist/src/main/flink-bin/conf/log4j.properties
index 97ec653..8e00ce3 100644
--- a/flink-dist/src/main/flink-bin/conf/log4j.properties
+++ b/flink-dist/src/main/flink-bin/conf/log4j.properties
@@ -16,8 +16,20 @@
# limitations under the License.
################################################################################
+# This affects logging for both user code and Flink
log4j.rootLogger=INFO, file
+# Uncomment this if you want to _only_ change Flink's logging
+#log4j.logger.org.apache.flink=INFO
+
+# The following lines keep the log level of common libraries/connectors on
+# log level INFO. The root logger does not override this. You have to manually
+# change the log levels here.
+log4j.logger.akka=INFO
+log4j.logger.org.apache.kafka=INFO
+log4j.logger.org.apache.hadoop=INFO
+log4j.logger.org.apache.zookeeper=INFO
+
# Log all infos in the given file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.file=${log.file}
@@ -25,5 +37,5 @@ log4j.appender.file.append=false
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
-# suppress the irrelevant (wrong) warnings from the netty channel handler
+# Suppress the irrelevant (wrong) warnings from the Netty channel handler
log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file
http://git-wip-us.apache.org/repos/asf/flink/blob/357b932b/flink-dist/src/main/flink-bin/conf/logback.xml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/conf/logback.xml b/flink-dist/src/main/flink-bin/conf/logback.xml
index 1147a70..f3c4331 100644
--- a/flink-dist/src/main/flink-bin/conf/logback.xml
+++ b/flink-dist/src/main/flink-bin/conf/logback.xml
@@ -25,8 +25,34 @@
</encoder>
</appender>
+ <!-- This affects logging for both user code and Flink -->
<root level="INFO">
<appender-ref ref="file"/>
</root>
-</configuration>
+ <!-- Uncomment this if you want to only change Flink's logging -->
+ <!--<logger name="org.apache.flink" level="INFO">-->
+ <!--<appender-ref ref="file"/>-->
+ <!--</logger>-->
+
+ <!-- The following lines keep the log level of common libraries/connectors on
+ log level INFO. The root logger does not override this. You have to manually
+ change the log levels here. -->
+ <logger name="akka" level="INFO">
+ <appender-ref ref="file"/>
+ </logger>
+ <logger name="org.apache.kafka" level="INFO">
+ <appender-ref ref="file"/>
+ </logger>
+ <logger name="org.apache.hadoop" level="INFO">
+ <appender-ref ref="file"/>
+ </logger>
+ <logger name="org.apache.zookeeper" level="INFO">
+ <appender-ref ref="file"/>
+ </logger>
+
+ <!-- Suppress the irrelevant (wrong) warnings from the Netty channel handler -->
+ <logger name="org.jboss.netty.channel.DefaultChannelPipeline" level="ERROR">
+ <appender-ref ref="file"/>
+ </logger>
+</configuration>
[6/7] flink git commit: [FLINK-5207] [logging] Decrease
HadoopFileSystem logging
Posted by uc...@apache.org.
[FLINK-5207] [logging] Decrease HadoopFileSystem logging
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/28e44e7b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/28e44e7b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/28e44e7b
Branch: refs/heads/release-1.1
Commit: 28e44e7bf0fd3aa5c55c74a1c39c22cd77935494
Parents: 8d949c9
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Nov 29 17:08:53 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Wed Nov 30 16:22:11 2016 +0100
----------------------------------------------------------------------
.../flink/runtime/fs/hdfs/HadoopFileSystem.java | 53 +++++++++-----------
1 file changed, 24 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/28e44e7b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
index 4e05ebe..f747112 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
@@ -17,25 +17,23 @@
*/
package org.apache.flink.runtime.fs.hdfs;
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.net.URI;
-import java.net.UnknownHostException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.flink.core.fs.HadoopFileSystemWrapper;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.fs.BlockLocation;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.HadoopFileSystemWrapper;
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.InstantiationUtil;
-
import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.URI;
+import java.net.UnknownHostException;
/**
* Concrete implementation of the {@link FileSystem} base class for the Hadoop File System. The
@@ -99,7 +97,7 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
fsClass = ((Class<?>) fsHandle).asSubclass(org.apache.hadoop.fs.FileSystem.class);
if (LOG.isDebugEnabled()) {
- LOG.debug("Loaded '" + fsClass.getName() + "' as HDFS class.");
+ LOG.debug("Loaded '{}' as HDFS class.", fsClass.getName());
}
}
else {
@@ -114,8 +112,8 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
{
// first of all, check for a user-defined hdfs class
if (LOG.isDebugEnabled()) {
- LOG.debug("Falling back to loading HDFS class old Hadoop style. Looking for HDFS class configuration entry '"
- + HDFS_IMPLEMENTATION_KEY + "'.");
+ LOG.debug("Falling back to loading HDFS class old Hadoop style. Looking for HDFS class configuration entry '{}'.",
+ HDFS_IMPLEMENTATION_KEY);
}
Class<?> classFromConfig = conf.getClass(HDFS_IMPLEMENTATION_KEY, null);
@@ -126,12 +124,12 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
fsClass = classFromConfig.asSubclass(org.apache.hadoop.fs.FileSystem.class);
if (LOG.isDebugEnabled()) {
- LOG.debug("Loaded HDFS class '" + fsClass.getName() + "' as specified in configuration.");
+ LOG.debug("Loaded HDFS class '{}' as specified in configuration.", fsClass.getName() );
}
}
else {
if (LOG.isDebugEnabled()) {
- LOG.debug("HDFS class specified by " + HDFS_IMPLEMENTATION_KEY + " is of wrong type.");
+ LOG.debug("HDFS class specified by {} is of wrong type.", HDFS_IMPLEMENTATION_KEY);
}
throw new IOException("HDFS class specified by " + HDFS_IMPLEMENTATION_KEY +
@@ -141,7 +139,7 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
else {
// load the default HDFS class
if (LOG.isDebugEnabled()) {
- LOG.debug("Trying to load default HDFS implementation " + DEFAULT_HDFS_CLASS);
+ LOG.debug("Trying to load default HDFS implementation {}.", DEFAULT_HDFS_CLASS);
}
try {
@@ -186,14 +184,14 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
if (hdfsDefaultPath != null) {
retConf.addResource(new org.apache.hadoop.fs.Path(hdfsDefaultPath));
} else {
- LOG.debug("Cannot find hdfs-default configuration file");
+ LOG.trace("{} configuration key for hdfs-default configuration file not set", ConfigConstants.HDFS_DEFAULT_CONFIG);
}
final String hdfsSitePath = GlobalConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null);
if (hdfsSitePath != null) {
retConf.addResource(new org.apache.hadoop.fs.Path(hdfsSitePath));
} else {
- LOG.debug("Cannot find hdfs-site configuration file");
+ LOG.trace("{} configuration key for hdfs-site configuration file not set", ConfigConstants.HDFS_SITE_CONFIG);
}
// 2. Approach environment variables
@@ -211,17 +209,14 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
if (new File(possibleHadoopConfPath).exists()) {
if (new File(possibleHadoopConfPath + "/core-site.xml").exists()) {
retConf.addResource(new org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/core-site.xml"));
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Adding " + possibleHadoopConfPath + "/core-site.xml to hadoop configuration");
- }
+ } else {
+ LOG.debug("File {}/core-site.xml not found.", possibleHadoopConfPath);
}
+
if (new File(possibleHadoopConfPath + "/hdfs-site.xml").exists()) {
retConf.addResource(new org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/hdfs-site.xml"));
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Adding " + possibleHadoopConfPath + "/hdfs-site.xml to hadoop configuration");
- }
+ } else {
+ LOG.debug("File {}/hdfs-site.xml not found.", possibleHadoopConfPath);
}
}
}
@@ -285,7 +280,7 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
}
if (LOG.isDebugEnabled()) {
- LOG.debug("fs.defaultFS is set to " + configEntry);
+ LOG.debug("fs.defaultFS is set to {}", configEntry);
}
if (configEntry == null) {
@@ -460,7 +455,7 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
clazz = hadoopConf.getClass("fs." + scheme + ".impl", null, org.apache.hadoop.fs.FileSystem.class);
if(clazz != null && LOG.isDebugEnabled()) {
- LOG.debug("Flink supports "+scheme+" with the Hadoop file system wrapper, impl "+clazz);
+ LOG.debug("Flink supports {} with the Hadoop file system wrapper, impl {}", scheme, clazz);
}
return clazz;
}
[2/7] flink git commit: [FLINK-5199] [logging] Improve logging in
ZooKeeperSubmittedJobGraphStore
Posted by uc...@apache.org.
[FLINK-5199] [logging] Improve logging in ZooKeeperSubmittedJobGraphStore
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8d949c96
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8d949c96
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8d949c96
Branch: refs/heads/release-1.1
Commit: 8d949c966b8916213e9b57996aaba0fe7c0e13fa
Parents: ee478fe
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Nov 29 16:35:14 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Wed Nov 30 16:22:11 2016 +0100
----------------------------------------------------------------------
.../runtime/jobmanager/SubmittedJobGraph.java | 2 +-
.../ZooKeeperSubmittedJobGraphStore.java | 87 +++++++++++++-------
2 files changed, 57 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8d949c96/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java
index faacc93..e868da7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java
@@ -72,6 +72,6 @@ public class SubmittedJobGraph implements Serializable {
@Override
public String toString() {
- return String.format("SubmittedJobGraph(%s, %s)", jobGraph, jobInfo);
+ return String.format("SubmittedJobGraph(%s, %s)", jobGraph.getJobID(), jobInfo);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8d949c96/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
index a1dd14b..7324c07 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
@@ -82,6 +82,9 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
*/
private final PathChildrenCache pathCache;
+ /** The full configured base path including the namespace. */
+ private final String zooKeeperFullBasePath;
+
/** The external listener to be notified on races. */
private SubmittedJobGraphListener jobGraphListener;
@@ -117,6 +120,7 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
// All operations will have the path as root
CuratorFramework facade = client.usingNamespace(client.getNamespace() + currentJobsPath);
+ this.zooKeeperFullBasePath = client.getNamespace() + currentJobsPath;
this.jobGraphsInZooKeeper = new ZooKeeperStateHandleStore<>(facade, stateStorage, executor);
this.pathCache = new PathChildrenCache(facade, "/", false);
@@ -156,6 +160,8 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
synchronized (cacheLock) {
verifyIsRunning();
+ LOG.debug("Recovering all job graphs from ZooKeeper at {}.", zooKeeperFullBasePath);
+
List<Tuple2<StateHandle<SubmittedJobGraph>, String>> submitted;
while (true) {
@@ -168,6 +174,8 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
}
}
+ LOG.info("Found {} job graphs.", submitted.size());
+
if (submitted.size() != 0) {
List<SubmittedJobGraph> jobGraphs = new ArrayList<>(submitted.size());
@@ -195,6 +203,8 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
checkNotNull(jobId, "Job ID");
String path = getPathForJob(jobId);
+ LOG.debug("Recovering job graph {} from {}{}.", jobId, zooKeeperFullBasePath, path);
+
synchronized (cacheLock) {
verifyIsRunning();
@@ -221,6 +231,8 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
checkNotNull(jobGraph, "Job graph");
String path = getPathForJob(jobGraph.getJobId());
+ LOG.debug("Adding job graph {} to {}{}.", jobGraph.getJobId(), zooKeeperFullBasePath, path);
+
boolean success = false;
while (!success) {
@@ -235,8 +247,6 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
addedJobGraphs.add(jobGraph.getJobId());
- LOG.info("Added {} to ZooKeeper.", jobGraph);
-
success = true;
}
catch (KeeperException.NodeExistsException ignored) {
@@ -258,6 +268,8 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
}
}
}
+
+ LOG.info("Added {} to ZooKeeper.", jobGraph);
}
@Override
@@ -265,14 +277,17 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
checkNotNull(jobId, "Job ID");
String path = getPathForJob(jobId);
+ LOG.debug("Removing job graph {} from {}{}.", jobId, zooKeeperFullBasePath, path);
+
synchronized (cacheLock) {
if (addedJobGraphs.contains(jobId)) {
jobGraphsInZooKeeper.removeAndDiscardState(path);
addedJobGraphs.remove(jobId);
- LOG.info("Removed job graph {} from ZooKeeper.", jobId);
}
}
+
+ LOG.info("Removed job graph {} from ZooKeeper.", jobId);
}
/**
@@ -297,70 +312,80 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
}
switch (event.getType()) {
- case CHILD_ADDED:
+ case CHILD_ADDED: {
+ JobID jobId = fromEvent(event);
+
+ LOG.debug("Received CHILD_ADDED event notification for job {}", jobId);
+
synchronized (cacheLock) {
try {
- JobID jobId = fromEvent(event);
if (jobGraphListener != null && !addedJobGraphs.contains(jobId)) {
try {
// Whoa! This has been added by someone else. Or we were fast
// to remove it (false positive).
jobGraphListener.onAddedJobGraph(jobId);
- }
- catch (Throwable t) {
+ } catch (Throwable t) {
LOG.error("Error in callback", t);
}
}
- }
- catch (Exception e) {
+ } catch (Exception e) {
LOG.error("Error in SubmittedJobGraphsPathCacheListener", e);
}
}
+ }
+ break;
- break;
-
- case CHILD_UPDATED:
+ case CHILD_UPDATED: {
// Nothing to do
- break;
+ }
+ break;
+
+ case CHILD_REMOVED: {
+ JobID jobId = fromEvent(event);
+
+ LOG.debug("Received CHILD_REMOVED event notification for job {}", jobId);
- case CHILD_REMOVED:
synchronized (cacheLock) {
try {
- JobID jobId = fromEvent(event);
if (jobGraphListener != null && addedJobGraphs.contains(jobId)) {
try {
// Oh oh. Someone else removed one of our job graphs. Mean!
jobGraphListener.onRemovedJobGraph(jobId);
- }
- catch (Throwable t) {
+ } catch (Throwable t) {
LOG.error("Error in callback", t);
}
}
break;
- }
- catch (Exception e) {
+ } catch (Exception e) {
LOG.error("Error in SubmittedJobGraphsPathCacheListener", e);
}
}
- break;
+ }
+ break;
- case CONNECTION_SUSPENDED:
+ case CONNECTION_SUSPENDED: {
LOG.warn("ZooKeeper connection SUSPENDED. Changes to the submitted job " +
- "graphs are not monitored (temporarily).");
- break;
- case CONNECTION_LOST:
+ "graphs are not monitored (temporarily).");
+ }
+ break;
+
+ case CONNECTION_LOST: {
LOG.warn("ZooKeeper connection LOST. Changes to the submitted job " +
- "graphs are not monitored (permanently).");
- break;
+ "graphs are not monitored (permanently).");
+ }
+ break;
- case CONNECTION_RECONNECTED:
+ case CONNECTION_RECONNECTED: {
LOG.info("ZooKeeper connection RECONNECTED. Changes to the submitted job " +
- "graphs are monitored again.");
- break;
- case INITIALIZED:
+ "graphs are monitored again.");
+ }
+ break;
+
+ case INITIALIZED: {
LOG.info("SubmittedJobGraphsPathCacheListener initialized");
- break;
+ }
+ break;
}
}
[7/7] flink git commit: [FLINK-5198] [logging] Improve TaskState
toString
Posted by uc...@apache.org.
[FLINK-5198] [logging] Improve TaskState toString
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ee478fe2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ee478fe2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ee478fe2
Branch: refs/heads/release-1.1
Commit: ee478fe278d5893048f015c2c1cc6dbfc7c68d8b
Parents: 7b9a444
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Nov 29 16:15:30 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Wed Nov 30 16:22:11 2016 +0100
----------------------------------------------------------------------
.../flink/runtime/checkpoint/CheckpointCoordinator.java | 8 ++++++--
.../org/apache/flink/runtime/checkpoint/TaskState.java | 12 ++++++++++++
2 files changed, 18 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ee478fe2/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 0cf944c..24cc3cb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -693,9 +693,13 @@ public class CheckpointCoordinator {
if (LOG.isDebugEnabled()) {
StringBuilder builder = new StringBuilder();
- for (Map.Entry<JobVertexID, TaskState> entry : completed.getTaskStates().entrySet()) {
- builder.append("JobVertexID: ").append(entry.getKey()).append(" {").append(entry.getValue()).append("}");
+ builder.append("Checkpoint state: ");
+ for (TaskState state : completed.getTaskStates().values()) {
+ builder.append(state);
+ builder.append(", ");
}
+ // Remove last two chars ", "
+ builder.delete(builder.length() - 2, builder.length());
LOG.debug(builder.toString());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ee478fe2/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
index ac4503d..14f8caa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
@@ -168,4 +168,16 @@ public class TaskState implements Serializable {
public int hashCode() {
return parallelism + 31 * Objects.hash(jobVertexID, subtaskStates, kvStates);
}
+
+ @Override
+ public String toString() {
+ // KvStates are always null in 1.1. Don't print this as it might
+ // confuse users that don't care about how we store it internally.
+ return "TaskState(" +
+ "jobVertexID: " + jobVertexID +
+ ", parallelism: " + parallelism +
+ ", sub task states: " + subtaskStates.size() +
+ ", total size (bytes): " + getStateSize() +
+ ')';
+ }
}
[3/7] flink git commit: [FLINK-5201] [logging] Log loaded config
properties on INFO level
Posted by uc...@apache.org.
[FLINK-5201] [logging] Log loaded config properties on INFO level
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c8ade638
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c8ade638
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c8ade638
Branch: refs/heads/release-1.1
Commit: c8ade638b79fcf6816c386c1070c81b83bd18916
Parents: ffe6b6b
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Nov 29 16:00:02 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Wed Nov 30 16:22:11 2016 +0100
----------------------------------------------------------------------
.../java/org/apache/flink/configuration/GlobalConfiguration.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c8ade638/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
index 7e50486..14a6ae8 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
@@ -253,7 +253,7 @@ public final class GlobalConfiguration {
continue;
}
- LOG.debug("Loading configuration property: {}, {}", key, value);
+ LOG.info("Loading configuration property: {}, {}", key, value);
this.config.setString(key, value);
}
@@ -372,7 +372,7 @@ public final class GlobalConfiguration {
if (key != null && value != null) {
// Put key, value pair into the map
- LOG.debug("Loading configuration property: {}, {}", key, value);
+ LOG.info("Loading configuration property: {}, {}", key, value);
this.config.setString(key, value);
} else {
LOG.warn("Error while reading configuration: Cannot read property " + propNumber);
[4/7] flink git commit: [FLINK-5194] [logging] Log heartbeats on
TRACE level
Posted by uc...@apache.org.
[FLINK-5194] [logging] Log heartbeats on TRACE level
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ffe6b6b5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ffe6b6b5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ffe6b6b5
Branch: refs/heads/release-1.1
Commit: ffe6b6b595cf80d4682e273803930f62139af9c0
Parents: 569a966
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Nov 29 17:15:27 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Wed Nov 30 16:22:11 2016 +0100
----------------------------------------------------------------------
.../apache/flink/runtime/akka/FlinkUntypedActor.java | 14 ++++++--------
.../flink/runtime/instance/InstanceManager.java | 4 +---
.../apache/flink/runtime/jobmanager/JobManager.scala | 2 +-
3 files changed, 8 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe6b6b5/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
index 5100d17..3255778 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
@@ -19,10 +19,8 @@
package org.apache.flink.runtime.akka;
import akka.actor.UntypedActor;
-
import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage;
import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,7 +38,7 @@ import java.util.UUID;
* a leader session ID option which is returned by getLeaderSessionID.
*/
public abstract class FlinkUntypedActor extends UntypedActor {
-
+
protected final Logger LOG = LoggerFactory.getLogger(getClass());
/**
@@ -56,16 +54,16 @@ public abstract class FlinkUntypedActor extends UntypedActor {
*/
@Override
public final void onReceive(Object message) throws Exception {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Received message {} at {} from {}.", message, getSelf().path(), getSender());
+ if(LOG.isTraceEnabled()) {
+ LOG.trace("Received message {} at {} from {}.", message, getSelf().path(), getSender());
long start = System.nanoTime();
handleLeaderSessionID(message);
- long duration = (System.nanoTime() - start)/ 1000000;
+ long duration = (System.nanoTime() - start)/ 1_000_000;
- LOG.debug("Handled message {} in {} ms from {}.", message, duration, getSender());
+ LOG.trace("Handled message {} in {} ms from {}.", message, duration, getSender());
} else {
handleLeaderSessionID(message);
}
@@ -81,7 +79,7 @@ public abstract class FlinkUntypedActor extends UntypedActor {
* @throws Exception
*/
private void handleLeaderSessionID(Object message) throws Exception {
- if(message instanceof LeaderSessionMessage) {
+ if (message instanceof LeaderSessionMessage) {
LeaderSessionMessage msg = (LeaderSessionMessage) message;
UUID expectedID = getLeaderSessionID();
UUID actualID = msg.leaderSessionID();
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe6b6b5/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
index 0d0d4c7..3fe92a5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
@@ -124,9 +124,7 @@ public class InstanceManager {
host.reportHeartBeat();
host.setMetricsReport(lastMetricsReport);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Received heartbeat from TaskManager " + host);
- }
+ LOG.trace("Received heartbeat from TaskManager {}", host);
return true;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe6b6b5/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index cf60d4e..9061db4 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -927,7 +927,7 @@ class JobManager(
)
case Heartbeat(instanceID, metricsReport, accumulators) =>
- log.debug(s"Received heartbeat message from $instanceID.")
+ log.trace(s"Received heartbeat message from $instanceID.")
updateAccumulators(accumulators)
[5/7] flink git commit: [FLINK-5196] [logging] Don't log
InputChannelDeploymentDescriptor
Posted by uc...@apache.org.
[FLINK-5196] [logging] Don't log InputChannelDeploymentDescriptor
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7b9a4445
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7b9a4445
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7b9a4445
Branch: refs/heads/release-1.1
Commit: 7b9a4445981ac8993af7a53cf057666e78c92140
Parents: c8ade63
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Nov 29 16:04:48 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Wed Nov 30 16:22:11 2016 +0100
----------------------------------------------------------------------
.../InputChannelDeploymentDescriptor.java | 3 ---
.../partition/consumer/SingleInputGate.java | 18 +++++++++++++++---
2 files changed, 15 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7b9a4445/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
index 6b87e69..24b95ea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
@@ -32,7 +32,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
-import java.util.Arrays;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -135,8 +134,6 @@ public class InputChannelDeploymentDescriptor implements Serializable {
consumedPartitionId, partitionLocation);
}
- LOG.debug("Created {} from edges {}.", Arrays.toString(icdd), Arrays.toString(edges));
-
return icdd;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7b9a4445/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 8f44fbc..1550b0d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -46,7 +46,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.BitSet;
import java.util.List;
import java.util.Map;
@@ -573,8 +572,11 @@ public class SingleInputGate implements InputGate {
// Create the input channels. There is one input channel for each consumed partition.
final InputChannel[] inputChannels = new InputChannel[icdd.length];
- for (int i = 0; i < inputChannels.length; i++) {
+ int numLocalChannels = 0;
+ int numRemoteChannels = 0;
+ int numUnknownChannels = 0;
+ for (int i = 0; i < inputChannels.length; i++) {
final ResultPartitionID partitionId = icdd[i].getConsumedPartitionId();
final ResultPartitionLocation partitionLocation = icdd[i].getConsumedPartitionLocation();
@@ -585,6 +587,8 @@ public class SingleInputGate implements InputGate {
networkEnvironment.getPartitionRequestInitialAndMaxBackoff(),
metrics
);
+
+ numLocalChannels++;
}
else if (partitionLocation.isRemote()) {
inputChannels[i] = new RemoteInputChannel(inputGate, i, partitionId,
@@ -593,6 +597,8 @@ public class SingleInputGate implements InputGate {
networkEnvironment.getPartitionRequestInitialAndMaxBackoff(),
metrics
);
+
+ numRemoteChannels++;
}
else if (partitionLocation.isUnknown()) {
inputChannels[i] = new UnknownInputChannel(inputGate, i, partitionId,
@@ -602,6 +608,8 @@ public class SingleInputGate implements InputGate {
networkEnvironment.getPartitionRequestInitialAndMaxBackoff(),
metrics
);
+
+ numUnknownChannels++;
}
else {
throw new IllegalStateException("Unexpected partition location.");
@@ -610,7 +618,11 @@ public class SingleInputGate implements InputGate {
inputGate.setInputChannel(partitionId.getPartitionId(), inputChannels[i]);
}
- LOG.debug("Created input channels {} from {}.", Arrays.toString(inputChannels), igdd);
+ LOG.debug("Created {} input channels (local: {}, remote: {}, unknown: {}).",
+ inputChannels.length,
+ numLocalChannels,
+ numRemoteChannels,
+ numUnknownChannels);
return inputGate;
}