You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/12/01 09:44:46 UTC
[1/7] flink git commit: [FLINK-5192] [logging] Improve log config
templates
Repository: flink
Updated Branches:
refs/heads/master 3223a160f -> 7d66aaeb0
[FLINK-5192] [logging] Improve log config templates
This closes #2899.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7d66aaeb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7d66aaeb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7d66aaeb
Branch: refs/heads/master
Commit: 7d66aaeb0fb52c5cd258e8b32ba8394eedd5d4ca
Parents: bf859e7
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Nov 29 17:14:23 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Dec 1 10:44:23 2016 +0100
----------------------------------------------------------------------
.../src/main/flink-bin/conf/log4j.properties | 14 +++++++++-
flink-dist/src/main/flink-bin/conf/logback.xml | 28 +++++++++++++++++++-
2 files changed, 40 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7d66aaeb/flink-dist/src/main/flink-bin/conf/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/conf/log4j.properties b/flink-dist/src/main/flink-bin/conf/log4j.properties
index 97ec653..8e00ce3 100644
--- a/flink-dist/src/main/flink-bin/conf/log4j.properties
+++ b/flink-dist/src/main/flink-bin/conf/log4j.properties
@@ -16,8 +16,20 @@
# limitations under the License.
################################################################################
+# This affects logging for both user code and Flink
log4j.rootLogger=INFO, file
+# Uncomment this if you want to _only_ change Flink's logging
+#log4j.logger.org.apache.flink=INFO
+
+# The following lines keep the log level of common libraries/connectors on
+# log level INFO. The root logger does not override this. You have to manually
+# change the log levels here.
+log4j.logger.akka=INFO
+log4j.logger.org.apache.kafka=INFO
+log4j.logger.org.apache.hadoop=INFO
+log4j.logger.org.apache.zookeeper=INFO
+
# Log all infos in the given file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.file=${log.file}
@@ -25,5 +37,5 @@ log4j.appender.file.append=false
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
-# suppress the irrelevant (wrong) warnings from the netty channel handler
+# Suppress the irrelevant (wrong) warnings from the Netty channel handler
log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file
http://git-wip-us.apache.org/repos/asf/flink/blob/7d66aaeb/flink-dist/src/main/flink-bin/conf/logback.xml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/conf/logback.xml b/flink-dist/src/main/flink-bin/conf/logback.xml
index 1147a70..f3c4331 100644
--- a/flink-dist/src/main/flink-bin/conf/logback.xml
+++ b/flink-dist/src/main/flink-bin/conf/logback.xml
@@ -25,8 +25,34 @@
</encoder>
</appender>
+ <!-- This affects logging for both user code and Flink -->
<root level="INFO">
<appender-ref ref="file"/>
</root>
-</configuration>
+ <!-- Uncomment this if you want to only change Flink's logging -->
+ <!--<logger name="org.apache.flink" level="INFO">-->
+ <!--<appender-ref ref="file"/>-->
+ <!--</logger>-->
+
+ <!-- The following lines keep the log level of common libraries/connectors on
+ log level INFO. The root logger does not override this. You have to manually
+ change the log levels here. -->
+ <logger name="akka" level="INFO">
+ <appender-ref ref="file"/>
+ </logger>
+ <logger name="org.apache.kafka" level="INFO">
+ <appender-ref ref="file"/>
+ </logger>
+ <logger name="org.apache.hadoop" level="INFO">
+ <appender-ref ref="file"/>
+ </logger>
+ <logger name="org.apache.zookeeper" level="INFO">
+ <appender-ref ref="file"/>
+ </logger>
+
+ <!-- Suppress the irrelevant (wrong) warnings from the Netty channel handler -->
+ <logger name="org.jboss.netty.channel.DefaultChannelPipeline" level="ERROR">
+ <appender-ref ref="file"/>
+ </logger>
+</configuration>
[2/7] flink git commit: [FLINK-5199] [logging] Improve logging in
ZooKeeperSubmittedJobGraphStore
Posted by uc...@apache.org.
[FLINK-5199] [logging] Improve logging in ZooKeeperSubmittedJobGraphStore
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f91dd9fb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f91dd9fb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f91dd9fb
Branch: refs/heads/master
Commit: f91dd9fbaf392bc2968e974dddba4cda2a4f3be2
Parents: dc7d8ec
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Nov 29 16:35:14 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Dec 1 10:44:23 2016 +0100
----------------------------------------------------------------------
.../runtime/jobmanager/SubmittedJobGraph.java | 2 +-
.../ZooKeeperSubmittedJobGraphStore.java | 86 +++++++++++++-------
2 files changed, 56 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f91dd9fb/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java
index faacc93..e868da7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java
@@ -72,6 +72,6 @@ public class SubmittedJobGraph implements Serializable {
@Override
public String toString() {
- return String.format("SubmittedJobGraph(%s, %s)", jobGraph, jobInfo);
+ return String.format("SubmittedJobGraph(%s, %s)", jobGraph.getJobID(), jobInfo);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f91dd9fb/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
index b241712..c1dc656 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
@@ -82,6 +82,9 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
*/
private final PathChildrenCache pathCache;
+ /** The full configured base path including the namespace. */
+ private final String zooKeeperFullBasePath;
+
/** The external listener to be notified on races. */
private SubmittedJobGraphListener jobGraphListener;
@@ -117,6 +120,7 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
// All operations will have the path as root
CuratorFramework facade = client.usingNamespace(client.getNamespace() + currentJobsPath);
+ this.zooKeeperFullBasePath = client.getNamespace() + currentJobsPath;
this.jobGraphsInZooKeeper = new ZooKeeperStateHandleStore<>(facade, stateStorage, executor);
this.pathCache = new PathChildrenCache(facade, "/", false);
@@ -156,6 +160,7 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
synchronized (cacheLock) {
verifyIsRunning();
+ LOG.debug("Recovering all job graphs from ZooKeeper at {}.", zooKeeperFullBasePath);
List<Tuple2<RetrievableStateHandle<SubmittedJobGraph>, String>> submitted;
while (true) {
@@ -168,6 +173,8 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
}
}
+ LOG.info("Found {} job graphs.", submitted.size());
+
if (submitted.size() != 0) {
List<SubmittedJobGraph> jobGraphs = new ArrayList<>(submitted.size());
@@ -193,6 +200,8 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
checkNotNull(jobId, "Job ID");
String path = getPathForJob(jobId);
+ LOG.debug("Recovering job graph {} from {}{}.", jobId, zooKeeperFullBasePath, path);
+
synchronized (cacheLock) {
verifyIsRunning();
@@ -215,6 +224,8 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
checkNotNull(jobGraph, "Job graph");
String path = getPathForJob(jobGraph.getJobId());
+ LOG.debug("Adding job graph {} to {}{}.", jobGraph.getJobId(), zooKeeperFullBasePath, path);
+
boolean success = false;
while (!success) {
@@ -229,8 +240,6 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
addedJobGraphs.add(jobGraph.getJobId());
- LOG.info("Added {} to ZooKeeper.", jobGraph);
-
success = true;
}
catch (KeeperException.NodeExistsException ignored) {
@@ -252,6 +261,8 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
}
}
}
+
+ LOG.info("Added {} to ZooKeeper.", jobGraph);
}
@Override
@@ -259,14 +270,17 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
checkNotNull(jobId, "Job ID");
String path = getPathForJob(jobId);
+ LOG.debug("Removing job graph {} from {}{}.", jobId, zooKeeperFullBasePath, path);
+
synchronized (cacheLock) {
if (addedJobGraphs.contains(jobId)) {
jobGraphsInZooKeeper.removeAndDiscardState(path);
addedJobGraphs.remove(jobId);
- LOG.info("Removed job graph {} from ZooKeeper.", jobId);
}
}
+
+ LOG.info("Removed job graph {} from ZooKeeper.", jobId);
}
/**
@@ -291,70 +305,80 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
}
switch (event.getType()) {
- case CHILD_ADDED:
+ case CHILD_ADDED: {
+ JobID jobId = fromEvent(event);
+
+ LOG.debug("Received CHILD_ADDED event notification for job {}", jobId);
+
synchronized (cacheLock) {
try {
- JobID jobId = fromEvent(event);
if (jobGraphListener != null && !addedJobGraphs.contains(jobId)) {
try {
// Whoa! This has been added by someone else. Or we were fast
// to remove it (false positive).
jobGraphListener.onAddedJobGraph(jobId);
- }
- catch (Throwable t) {
+ } catch (Throwable t) {
LOG.error("Error in callback", t);
}
}
- }
- catch (Exception e) {
+ } catch (Exception e) {
LOG.error("Error in SubmittedJobGraphsPathCacheListener", e);
}
}
+ }
+ break;
- break;
-
- case CHILD_UPDATED:
+ case CHILD_UPDATED: {
// Nothing to do
- break;
+ }
+ break;
+
+ case CHILD_REMOVED: {
+ JobID jobId = fromEvent(event);
+
+ LOG.debug("Received CHILD_REMOVED event notification for job {}", jobId);
- case CHILD_REMOVED:
synchronized (cacheLock) {
try {
- JobID jobId = fromEvent(event);
if (jobGraphListener != null && addedJobGraphs.contains(jobId)) {
try {
// Oh oh. Someone else removed one of our job graphs. Mean!
jobGraphListener.onRemovedJobGraph(jobId);
- }
- catch (Throwable t) {
+ } catch (Throwable t) {
LOG.error("Error in callback", t);
}
}
break;
- }
- catch (Exception e) {
+ } catch (Exception e) {
LOG.error("Error in SubmittedJobGraphsPathCacheListener", e);
}
}
- break;
+ }
+ break;
- case CONNECTION_SUSPENDED:
+ case CONNECTION_SUSPENDED: {
LOG.warn("ZooKeeper connection SUSPENDED. Changes to the submitted job " +
- "graphs are not monitored (temporarily).");
- break;
- case CONNECTION_LOST:
+ "graphs are not monitored (temporarily).");
+ }
+ break;
+
+ case CONNECTION_LOST: {
LOG.warn("ZooKeeper connection LOST. Changes to the submitted job " +
- "graphs are not monitored (permanently).");
- break;
+ "graphs are not monitored (permanently).");
+ }
+ break;
- case CONNECTION_RECONNECTED:
+ case CONNECTION_RECONNECTED: {
LOG.info("ZooKeeper connection RECONNECTED. Changes to the submitted job " +
- "graphs are monitored again.");
- break;
- case INITIALIZED:
+ "graphs are monitored again.");
+ }
+ break;
+
+ case INITIALIZED: {
LOG.info("SubmittedJobGraphsPathCacheListener initialized");
- break;
+ }
+ break;
}
}
[4/7] flink git commit: [FLINK-5194] [logging] Log heartbeats on
TRACE level
Posted by uc...@apache.org.
[FLINK-5194] [logging] Log heartbeats on TRACE level
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8228ac6e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8228ac6e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8228ac6e
Branch: refs/heads/master
Commit: 8228ac6e4d4e047e413ec316fc2ee3f5022b9afd
Parents: 3223a16
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Nov 29 17:15:27 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Dec 1 10:44:23 2016 +0100
----------------------------------------------------------------------
.../apache/flink/runtime/akka/FlinkUntypedActor.java | 14 ++++++--------
.../flink/runtime/instance/InstanceManager.java | 4 +---
.../apache/flink/runtime/jobmanager/JobManager.scala | 2 +-
3 files changed, 8 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8228ac6e/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
index 5100d17..3255778 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
@@ -19,10 +19,8 @@
package org.apache.flink.runtime.akka;
import akka.actor.UntypedActor;
-
import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage;
import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,7 +38,7 @@ import java.util.UUID;
* a leader session ID option which is returned by getLeaderSessionID.
*/
public abstract class FlinkUntypedActor extends UntypedActor {
-
+
protected final Logger LOG = LoggerFactory.getLogger(getClass());
/**
@@ -56,16 +54,16 @@ public abstract class FlinkUntypedActor extends UntypedActor {
*/
@Override
public final void onReceive(Object message) throws Exception {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Received message {} at {} from {}.", message, getSelf().path(), getSender());
+ if(LOG.isTraceEnabled()) {
+ LOG.trace("Received message {} at {} from {}.", message, getSelf().path(), getSender());
long start = System.nanoTime();
handleLeaderSessionID(message);
- long duration = (System.nanoTime() - start)/ 1000000;
+ long duration = (System.nanoTime() - start)/ 1_000_000;
- LOG.debug("Handled message {} in {} ms from {}.", message, duration, getSender());
+ LOG.trace("Handled message {} in {} ms from {}.", message, duration, getSender());
} else {
handleLeaderSessionID(message);
}
@@ -81,7 +79,7 @@ public abstract class FlinkUntypedActor extends UntypedActor {
* @throws Exception
*/
private void handleLeaderSessionID(Object message) throws Exception {
- if(message instanceof LeaderSessionMessage) {
+ if (message instanceof LeaderSessionMessage) {
LeaderSessionMessage msg = (LeaderSessionMessage) message;
UUID expectedID = getLeaderSessionID();
UUID actualID = msg.leaderSessionID();
http://git-wip-us.apache.org/repos/asf/flink/blob/8228ac6e/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
index 65909db..1c2d66f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
@@ -118,9 +118,7 @@ public class InstanceManager {
host.reportHeartBeat();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Received heartbeat from TaskManager " + host);
- }
+ LOG.trace("Received heartbeat from TaskManager {}", host);
return true;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8228ac6e/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 7cddb2b..0ffca55 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1031,7 +1031,7 @@ class JobManager(
)
case Heartbeat(instanceID, accumulators) =>
- log.debug(s"Received heartbeat message from $instanceID.")
+ log.trace(s"Received heartbeat message from $instanceID.")
updateAccumulators(accumulators)
[6/7] flink git commit: [FLINK-5207] [logging] Decrease
HadoopFileSystem logging
Posted by uc...@apache.org.
[FLINK-5207] [logging] Decrease HadoopFileSystem logging
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bf859e77
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bf859e77
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bf859e77
Branch: refs/heads/master
Commit: bf859e77abb2f1ee14b0bdf18cdb2fe526369203
Parents: f91dd9f
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Nov 29 17:08:53 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Dec 1 10:44:23 2016 +0100
----------------------------------------------------------------------
.../flink/runtime/fs/hdfs/HadoopFileSystem.java | 53 +++++++++-----------
1 file changed, 24 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/bf859e77/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
index 5d7173b..0eab032 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
@@ -17,25 +17,23 @@
*/
package org.apache.flink.runtime.fs.hdfs;
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.net.URI;
-import java.net.UnknownHostException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.flink.core.fs.HadoopFileSystemWrapper;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.fs.BlockLocation;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.HadoopFileSystemWrapper;
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.InstantiationUtil;
-
import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.URI;
+import java.net.UnknownHostException;
/**
* Concrete implementation of the {@link FileSystem} base class for the Hadoop File System. The
@@ -99,7 +97,7 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
fsClass = ((Class<?>) fsHandle).asSubclass(org.apache.hadoop.fs.FileSystem.class);
if (LOG.isDebugEnabled()) {
- LOG.debug("Loaded '" + fsClass.getName() + "' as HDFS class.");
+ LOG.debug("Loaded '{}' as HDFS class.", fsClass.getName());
}
}
else {
@@ -114,8 +112,8 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
{
// first of all, check for a user-defined hdfs class
if (LOG.isDebugEnabled()) {
- LOG.debug("Falling back to loading HDFS class old Hadoop style. Looking for HDFS class configuration entry '"
- + HDFS_IMPLEMENTATION_KEY + "'.");
+ LOG.debug("Falling back to loading HDFS class old Hadoop style. Looking for HDFS class configuration entry '{}'.",
+ HDFS_IMPLEMENTATION_KEY);
}
Class<?> classFromConfig = conf.getClass(HDFS_IMPLEMENTATION_KEY, null);
@@ -126,12 +124,12 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
fsClass = classFromConfig.asSubclass(org.apache.hadoop.fs.FileSystem.class);
if (LOG.isDebugEnabled()) {
- LOG.debug("Loaded HDFS class '" + fsClass.getName() + "' as specified in configuration.");
+ LOG.debug("Loaded HDFS class '{}' as specified in configuration.", fsClass.getName() );
}
}
else {
if (LOG.isDebugEnabled()) {
- LOG.debug("HDFS class specified by " + HDFS_IMPLEMENTATION_KEY + " is of wrong type.");
+ LOG.debug("HDFS class specified by {} is of wrong type.", HDFS_IMPLEMENTATION_KEY);
}
throw new IOException("HDFS class specified by " + HDFS_IMPLEMENTATION_KEY +
@@ -141,7 +139,7 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
else {
// load the default HDFS class
if (LOG.isDebugEnabled()) {
- LOG.debug("Trying to load default HDFS implementation " + DEFAULT_HDFS_CLASS);
+ LOG.debug("Trying to load default HDFS implementation {}.", DEFAULT_HDFS_CLASS);
}
try {
@@ -190,14 +188,14 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
if (hdfsDefaultPath != null) {
retConf.addResource(new org.apache.hadoop.fs.Path(hdfsDefaultPath));
} else {
- LOG.debug("Cannot find hdfs-default configuration file");
+ LOG.trace("{} configuration key for hdfs-default configuration file not set", ConfigConstants.HDFS_DEFAULT_CONFIG);
}
final String hdfsSitePath = flinkConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null);
if (hdfsSitePath != null) {
retConf.addResource(new org.apache.hadoop.fs.Path(hdfsSitePath));
} else {
- LOG.debug("Cannot find hdfs-site configuration file");
+ LOG.trace("{} configuration key for hdfs-site configuration file not set", ConfigConstants.HDFS_SITE_CONFIG);
}
// 2. Approach environment variables
@@ -215,17 +213,14 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
if (new File(possibleHadoopConfPath).exists()) {
if (new File(possibleHadoopConfPath + "/core-site.xml").exists()) {
retConf.addResource(new org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/core-site.xml"));
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Adding " + possibleHadoopConfPath + "/core-site.xml to hadoop configuration");
- }
+ } else {
+ LOG.debug("File {}/core-site.xml not found.", possibleHadoopConfPath);
}
+
if (new File(possibleHadoopConfPath + "/hdfs-site.xml").exists()) {
retConf.addResource(new org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/hdfs-site.xml"));
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Adding " + possibleHadoopConfPath + "/hdfs-site.xml to hadoop configuration");
- }
+ } else {
+ LOG.debug("File {}/hdfs-site.xml not found.", possibleHadoopConfPath);
}
}
}
@@ -289,7 +284,7 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
}
if (LOG.isDebugEnabled()) {
- LOG.debug("fs.defaultFS is set to " + configEntry);
+ LOG.debug("fs.defaultFS is set to {}", configEntry);
}
if (configEntry == null) {
@@ -464,7 +459,7 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
clazz = hadoopConf.getClass("fs." + scheme + ".impl", null, org.apache.hadoop.fs.FileSystem.class);
if(clazz != null && LOG.isDebugEnabled()) {
- LOG.debug("Flink supports "+scheme+" with the Hadoop file system wrapper, impl "+clazz);
+ LOG.debug("Flink supports {} with the Hadoop file system wrapper, impl {}", scheme, clazz);
}
return clazz;
}
[5/7] flink git commit: [FLINK-5196] [logging] Don't log
InputChannelDeploymentDescriptor
Posted by uc...@apache.org.
[FLINK-5196] [logging] Don't log InputChannelDeploymentDescriptor
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/67bd8277
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/67bd8277
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/67bd8277
Branch: refs/heads/master
Commit: 67bd8277d1dc1179c30d2dbad0922122ed6f49ee
Parents: dc5650a
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Nov 29 16:04:48 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Dec 1 10:44:23 2016 +0100
----------------------------------------------------------------------
.../InputChannelDeploymentDescriptor.java | 3 ---
.../partition/consumer/SingleInputGate.java | 18 +++++++++++++++---
2 files changed, 15 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/67bd8277/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
index a72b92f..9b3ce5f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
@@ -35,7 +35,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
-import java.util.Arrays;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -143,8 +142,6 @@ public class InputChannelDeploymentDescriptor implements Serializable {
consumedPartitionId, partitionLocation);
}
- LOG.debug("Created {} from edges {}.", Arrays.toString(icdd), Arrays.toString(edges));
-
return icdd;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/67bd8277/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 8f57542..d7ed33c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -45,7 +45,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.BitSet;
import java.util.List;
import java.util.Map;
@@ -554,8 +553,11 @@ public class SingleInputGate implements InputGate {
// Create the input channels. There is one input channel for each consumed partition.
final InputChannel[] inputChannels = new InputChannel[icdd.length];
- for (int i = 0; i < inputChannels.length; i++) {
+ int numLocalChannels = 0;
+ int numRemoteChannels = 0;
+ int numUnknownChannels = 0;
+ for (int i = 0; i < inputChannels.length; i++) {
final ResultPartitionID partitionId = icdd[i].getConsumedPartitionId();
final ResultPartitionLocation partitionLocation = icdd[i].getConsumedPartitionLocation();
@@ -567,6 +569,8 @@ public class SingleInputGate implements InputGate {
networkEnvironment.getPartitionRequestMaxBackoff(),
metrics
);
+
+ numLocalChannels++;
}
else if (partitionLocation.isRemote()) {
inputChannels[i] = new RemoteInputChannel(inputGate, i, partitionId,
@@ -576,6 +580,8 @@ public class SingleInputGate implements InputGate {
networkEnvironment.getPartitionRequestMaxBackoff(),
metrics
);
+
+ numRemoteChannels++;
}
else if (partitionLocation.isUnknown()) {
inputChannels[i] = new UnknownInputChannel(inputGate, i, partitionId,
@@ -586,6 +592,8 @@ public class SingleInputGate implements InputGate {
networkEnvironment.getPartitionRequestMaxBackoff(),
metrics
);
+
+ numUnknownChannels++;
}
else {
throw new IllegalStateException("Unexpected partition location.");
@@ -594,7 +602,11 @@ public class SingleInputGate implements InputGate {
inputGate.setInputChannel(partitionId.getPartitionId(), inputChannels[i]);
}
- LOG.debug("Created input channels {} from {}.", Arrays.toString(inputChannels), igdd);
+ LOG.debug("Created {} input channels (local: {}, remote: {}, unknown: {}).",
+ inputChannels.length,
+ numLocalChannels,
+ numRemoteChannels,
+ numUnknownChannels);
return inputGate;
}
[7/7] flink git commit: [FLINK-5198] [logging] Improve TaskState
toString
Posted by uc...@apache.org.
[FLINK-5198] [logging] Improve TaskState toString
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dc7d8ec2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dc7d8ec2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dc7d8ec2
Branch: refs/heads/master
Commit: dc7d8ec2c4d03c42e3d582947a3fe39a274d7f4b
Parents: 67bd827
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Nov 29 16:15:30 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Dec 1 10:44:23 2016 +0100
----------------------------------------------------------------------
.../runtime/checkpoint/CheckpointCoordinator.java | 8 ++++++--
.../apache/flink/runtime/checkpoint/TaskState.java | 15 +++++++++++++--
2 files changed, 19 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/dc7d8ec2/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 638e0a7..2242c14 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -653,9 +653,13 @@ public class CheckpointCoordinator {
if (LOG.isDebugEnabled()) {
StringBuilder builder = new StringBuilder();
- for (Map.Entry<JobVertexID, TaskState> entry : completed.getTaskStates().entrySet()) {
- builder.append("JobVertexID: ").append(entry.getKey()).append(" {").append(entry.getValue()).append("}");
+ builder.append("Checkpoint state: ");
+ for (TaskState state : completed.getTaskStates().values()) {
+ builder.append(state);
+ builder.append(", ");
}
+ // Remove last two chars ", "
+ builder.delete(builder.length() - 2, builder.length());
LOG.debug(builder.toString());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dc7d8ec2/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
index 3cdc5e9..76f1c51 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
@@ -23,7 +23,6 @@ import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StateUtil;
import org.apache.flink.util.Preconditions;
-import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -130,7 +129,7 @@ public class TaskState implements StateObject {
@Override
- public long getStateSize() throws IOException {
+ public long getStateSize() {
long result = 0L;
for (int i = 0; i < parallelism; i++) {
@@ -164,4 +163,16 @@ public class TaskState implements StateObject {
public Map<Integer, SubtaskState> getSubtaskStates() {
return Collections.unmodifiableMap(subtaskStates);
}
+
+ @Override
+ public String toString() {
+ // KvStates are always null in 1.1. Don't print this as it might
+ // confuse users that don't care about how we store it internally.
+ return "TaskState(" +
+ "jobVertexID: " + jobVertexID +
+ ", parallelism: " + parallelism +
+ ", sub task states: " + subtaskStates.size() +
+ ", total size (bytes): " + getStateSize() +
+ ')';
+ }
}
[3/7] flink git commit: [FLINK-5201] [logging] Log loaded config
properties on INFO level
Posted by uc...@apache.org.
[FLINK-5201] [logging] Log loaded config properties on INFO level
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dc5650a3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dc5650a3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dc5650a3
Branch: refs/heads/master
Commit: dc5650a3ea15b950aa906e456960e1cf7c94d3a0
Parents: 8228ac6
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Nov 29 16:00:02 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Dec 1 10:44:23 2016 +0100
----------------------------------------------------------------------
.../java/org/apache/flink/configuration/GlobalConfiguration.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/dc5650a3/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
index 8d550d7..ecfbc72 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
@@ -144,7 +144,7 @@ public final class GlobalConfiguration {
continue;
}
- LOG.debug("Loading configuration property: {}, {}", key, value);
+ LOG.info("Loading configuration property: {}, {}", key, value);
config.setString(key, value);
}
}