You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2014/09/05 11:57:57 UTC

[4/5] [FLINK-1086] Replace JCL with SLF4J and Log4j with LOGBack

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java
index b0799cd..efb1e94 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java
@@ -36,9 +36,6 @@ import org.apache.flink.streaming.api.function.sink.SinkFunction;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.streaming.util.ClusterUtil;
 import org.apache.flink.util.Collector;
-import org.apache.flink.util.LogUtils;
-import org.apache.log4j.Level;
-import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
 
@@ -86,11 +83,6 @@ public class StreamComponentTest {
 	private static final int SOURCE_PARALELISM = 1;
 	private static final long MEMORYSIZE = 32;
 
-	@Before
-	public void before() {
-		LogUtils.initializeDefaultConsoleLogger(Level.OFF);
-	}
-
 	@Ignore
 	@Test
 	public void wrongJobGraph() {
@@ -193,9 +185,6 @@ public class StreamComponentTest {
 
 	@Test
 	public void runStream() {
-
-		LogUtils.initializeDefaultTestConsoleLogger();
-
 		LocalStreamEnvironment env = StreamExecutionEnvironment
 				.createLocalEnvironment(SOURCE_PARALELISM);
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-streaming/flink-streaming-core/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/resources/logback-test.xml b/flink-addons/flink-streaming/flink-streaming-core/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..45b3b92
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/resources/logback-test.xml
@@ -0,0 +1,30 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+    <logger name="org.apache.flink.streaming" level="WARN"/>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java
index 292d59a..adced76 100644
--- a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java
+++ b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java
@@ -44,8 +44,8 @@ import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.client.CliFrontend;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
@@ -74,11 +74,6 @@ import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.client.api.YarnClientApplication;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.util.Records;
-import org.apache.log4j.ConsoleAppender;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.apache.log4j.PatternLayout;
-
 
 /**
  * All classes in this package contain code taken from
@@ -94,7 +89,7 @@ import org.apache.log4j.PatternLayout;
  *
  */
 public class Client {
-	private static final Log LOG = LogFactory.getLog(Client.class);
+	private static final Logger LOG = LoggerFactory.getLogger(Client.class);
 
 	/**
 	 * Command Line argument options
@@ -202,21 +197,6 @@ public class Client {
 			System.exit(1);
 		}
 
-		if (System.getProperty("log4j.configuration") == null) {
-			Logger root = Logger.getRootLogger();
-			root.removeAllAppenders();
-			PatternLayout layout = new PatternLayout("%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n");
-			ConsoleAppender appender = new ConsoleAppender(layout, "System.err");
-			root.addAppender(appender);
-			if(cmd.hasOption(VERBOSE.getOpt())) {
-				root.setLevel(Level.DEBUG);
-				LOG.debug("CLASSPATH: "+System.getProperty("java.class.path"));
-			} else {
-				root.setLevel(Level.INFO);
-			}
-		}
-
-
 		// Jar Path
 		Path localJarPath;
 		if(cmd.hasOption(FLINK_JAR.getOpt())) {
@@ -243,7 +223,7 @@ public class Client {
 			confDirPath = cmd.getOptionValue(FLINK_CONF_DIR.getOpt())+"/";
 			File confFile = new File(confDirPath+CONFIG_FILE_NAME);
 			if(!confFile.exists()) {
-				LOG.fatal("Unable to locate configuration file in "+confFile);
+				LOG.error("Unable to locate configuration file in "+confFile);
 				System.exit(1);
 			}
 			confPath = new Path(confFile.getAbsolutePath());
@@ -290,13 +270,13 @@ public class Client {
 				LOG.warn("Ship directory is not a directory!");
 			}
 		}
-		boolean hasLog4j = false;
-		//check if there is a log4j file
+		boolean hasLogback = false;
+		//check if there is a logback file
 		if(confDirPath.length() > 0) {
-			File l4j = new File(confDirPath+"/log4j.properties");
-			if(l4j.exists()) {
-				shipFiles.add(l4j);
-				hasLog4j = true;
+			File logback = new File(confDirPath+"/logback.xml");
+			if(logback.exists()) {
+				shipFiles.add(logback);
+				hasLogback = true;
 			}
 		}
 
@@ -373,7 +353,7 @@ public class Client {
 			showClusterMetrics(yarnClient);
 		}
 		if(!cmd.hasOption(CONTAINER.getOpt())) {
-			LOG.fatal("Missing required argument "+CONTAINER.getOpt());
+			LOG.error("Missing required argument "+CONTAINER.getOpt());
 			printUsage();
 			yarnClient.stop();
 			System.exit(1);
@@ -395,13 +375,13 @@ public class Client {
 		GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
 		Resource maxRes = appResponse.getMaximumResourceCapability();
 		if(tmMemory > maxRes.getMemory() || tmCores > maxRes.getVirtualCores()) {
-			LOG.fatal("The cluster does not have the requested resources for the TaskManagers available!\n"
+			LOG.error("The cluster does not have the requested resources for the TaskManagers available!\n"
 					+ "Maximum Memory: "+maxRes.getMemory() +", Maximum Cores: "+tmCores);
 			yarnClient.stop();
 			System.exit(1);
 		}
 		if(jmMemory > maxRes.getMemory() ) {
-			LOG.fatal("The cluster does not have the requested resources for the JobManager available!\n"
+			LOG.error("The cluster does not have the requested resources for the JobManager available!\n"
 					+ "Maximum Memory: "+maxRes.getMemory());
 			yarnClient.stop();
 			System.exit(1);
@@ -409,19 +389,19 @@ public class Client {
 		int totalMemoryRequired = jmMemory + tmMemory * taskManagerCount;
 		ClusterResourceDescription freeClusterMem = getCurrentFreeClusterResources(yarnClient);
 		if(freeClusterMem.totalFreeMemory < totalMemoryRequired) {
-			LOG.fatal("This YARN session requires "+totalMemoryRequired+"MB of memory in the cluster. "
+			LOG.error("This YARN session requires "+totalMemoryRequired+"MB of memory in the cluster. "
 					+ "There are currently only "+freeClusterMem.totalFreeMemory+"MB available.");
 			yarnClient.stop();
 			System.exit(1);
 		}
 		if( tmMemory > freeClusterMem.containerLimit) {
-			LOG.fatal("The requested amount of memory for the TaskManagers ("+tmMemory+"MB) is more than "
+			LOG.error("The requested amount of memory for the TaskManagers ("+tmMemory+"MB) is more than "
 					+ "the largest possible YARN container: "+freeClusterMem.containerLimit);
 			yarnClient.stop();
 			System.exit(1);
 		}
 		if( jmMemory > freeClusterMem.containerLimit) {
-			LOG.fatal("The requested amount of memory for the JobManager ("+jmMemory+"MB) is more than "
+			LOG.error("The requested amount of memory for the JobManager ("+jmMemory+"MB) is more than "
 					+ "the largest possible YARN container: "+freeClusterMem.containerLimit);
 			yarnClient.stop();
 			System.exit(1);
@@ -436,8 +416,9 @@ public class Client {
 
 		String amCommand = "$JAVA_HOME/bin/java"
 					+ " -Xmx"+Utils.calculateHeapSize(jmMemory)+"M " +javaOpts;
-		if(hasLog4j) {
-			amCommand 	+= " -Dlog.file=\""+ApplicationConstants.LOG_DIR_EXPANSION_VAR +"/jobmanager-log4j.log\" -Dlog4j.configuration=file:log4j.properties";
+		if(hasLogback) {
+			amCommand 	+= " -Dlog.file=\""+ApplicationConstants.LOG_DIR_EXPANSION_VAR +"/jobmanager-logback.log\" " +
+					"-Dlogback.configurationFile=file:logback.xml";
 		}
 		amCommand 	+= " "+ApplicationMaster.class.getName()+" "
 					+ " 1>"
@@ -777,7 +758,7 @@ public class Client {
 		try {
 			jar = new JarFile(localJarPath.toUri().getPath());
 		} catch(FileNotFoundException fne) {
-			LOG.fatal("Unable to access jar file. Specify jar file or configuration file.", fne);
+			LOG.error("Unable to access jar file. Specify jar file or configuration file.", fne);
 			System.exit(1);
 		}
 		InputStream confStream = jar.getInputStream(jar.getEntry("flink-conf.yaml"));

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/ClientMasterControl.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/ClientMasterControl.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/ClientMasterControl.java
index 1c87289..07762cc 100644
--- a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/ClientMasterControl.java
+++ b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/ClientMasterControl.java
@@ -23,8 +23,8 @@ import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.runtime.ipc.RPC;
 import org.apache.flink.runtime.net.NetUtils;
 import org.apache.flink.yarn.rpc.ApplicationMasterStatus;
@@ -33,7 +33,7 @@ import org.apache.flink.yarn.rpc.YARNClientMasterProtocol.Message;
 
 
 public class ClientMasterControl extends Thread {
-	private static final Log LOG = LogFactory.getLog(ClientMasterControl.class);
+	private static final Logger LOG = LoggerFactory.getLogger(ClientMasterControl.class);
 
 	private InetSocketAddress applicationMasterAddress;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
index bd2cb9e..01539f8 100644
--- a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -34,8 +34,8 @@ import java.util.Map;
 import java.util.jar.JarEntry;
 import java.util.jar.JarFile;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.hadoop.conf.Configuration;
@@ -61,7 +61,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
 
 public class Utils {
 	
-	private static final Log LOG = LogFactory.getLog(Utils.class);
+	private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
 	private static final int HEAP_LIMIT_CAP = 500;
 	
 
@@ -163,7 +163,7 @@ public class Utils {
 		try {
 			hadoopHome = Shell.getHadoopHome();
 		} catch (IOException e) {
-			LOG.fatal("Unable to get hadoop home. Please set HADOOP_HOME variable!", e);
+			LOG.error("Unable to get hadoop home. Please set HADOOP_HOME variable!", e);
 			System.exit(1);
 		}
 		File tryConf = new File(hadoopHome+"/etc/hadoop");
@@ -181,10 +181,10 @@ public class Utils {
 	}
 	
 	public static void setupEnv(Configuration conf, Map<String, String> appMasterEnv) {
+		addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(), Environment.PWD.$() + File.separator + "*");
 		for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
 			addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(), c.trim());
 		}
-		addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(), Environment.PWD.$() + File.separator + "*");
 	}
 	
 	
@@ -236,7 +236,7 @@ public class Utils {
 		amContainer.setTokens(securityTokens);
 	}
 	
-	public static void logFilesInCurrentDirectory(final Log logger) {
+	public static void logFilesInCurrentDirectory(final Logger logger) {
 		new File(".").list(new FilenameFilter() {
 			
 			@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/ApplicationMaster.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/ApplicationMaster.java
index 30d2f30..afda8e0 100644
--- a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/ApplicationMaster.java
+++ b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/ApplicationMaster.java
@@ -34,8 +34,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.client.CliFrontend;
 import org.apache.flink.configuration.ConfigConstants;
@@ -78,7 +78,7 @@ import com.google.common.base.Preconditions;
 
 public class ApplicationMaster implements YARNClientMasterProtocol {
 
-	private static final Log LOG = LogFactory.getLog(ApplicationMaster.class);
+	private static final Logger LOG = LoggerFactory.getLogger(ApplicationMaster.class);
 
 	private final String currDir;
 	private final String logDirs;
@@ -132,9 +132,9 @@ public class ApplicationMaster implements YARNClientMasterProtocol {
 	private List<Message> messages = new SerializableArrayList<Message>();
 
 	/**
-	 * Indicates if a log4j config file is being shipped.
+	 * Indicates if a logback config file is being shipped.
 	 */
-	private boolean hasLog4j;
+	private boolean hasLogback;
 	
 	/**
 	 * Heap size of TaskManager containers in MB.
@@ -332,7 +332,7 @@ public class ApplicationMaster implements YARNClientMasterProtocol {
 		LOG.info("Prepared local resource for modified yaml: "+flinkConf);
 
 
-		hasLog4j = new File(currDir+"/log4j.properties").exists();
+		hasLogback = new File(currDir+"/logback.xml").exists();
 		// prepare the files to ship
 		LocalResource[] remoteShipRsc = null;
 		String[] remoteShipPaths = shipListString.split(",");
@@ -416,8 +416,9 @@ public class ApplicationMaster implements YARNClientMasterProtocol {
 				ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
 
 				String tmCommand = "$JAVA_HOME/bin/java -Xmx"+heapLimit+"m " + javaOpts ;
-				if(hasLog4j) {
-					tmCommand += " -Dlog.file=\""+ApplicationConstants.LOG_DIR_EXPANSION_VAR +"/taskmanager-log4j.log\" -Dlog4j.configuration=file:log4j.properties";
+				if(hasLogback) {
+					tmCommand += " -Dlog.file=\""+ApplicationConstants.LOG_DIR_EXPANSION_VAR +"/taskmanager-logback" +
+							".log\" -Dlogback.configurationFile=file:logback.xml";
 				}
 				tmCommand	+= " "+YarnTaskManagerRunner.class.getName()+" -configDir . "
 						+ " 1>"
@@ -585,7 +586,7 @@ public class ApplicationMaster implements YARNClientMasterProtocol {
 					am.setRMClient(rmClient);
 					am.run();
 				} catch (Throwable e) {
-					LOG.fatal("Error while running the application master", e);
+					LOG.error("Error while running the application master", e);
 					// the AM is not available. Report error through the unregister function.
 					if(rmClient != null && am == null) {
 						try {
@@ -593,13 +594,13 @@ public class ApplicationMaster implements YARNClientMasterProtocol {
 									+ " stopped unexpectedly with an exception.\n"
 									+ StringUtils.stringifyException(e), "");
 						} catch (Exception e1) {
-							LOG.fatal("Unable to fail the application master", e1);
+							LOG.error("Unable to fail the application master", e1);
 						}
 						LOG.info("AM unregistered from RM");
 						return null;
 					}
 					if(rmClient == null) {
-						LOG.fatal("Unable to unregister AM since the RM client is not available");
+						LOG.error("Unable to unregister AM since the RM client is not available");
 					}
 					if(am != null) {
 						LOG.info("Writing error into internal message system");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
index 0b0b1e4..ba0edc6 100644
--- a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
+++ b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
@@ -23,8 +23,8 @@ import java.security.PrivilegedAction;
 import java.util.Arrays;
 import java.util.Map;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.yarn.Client;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -35,7 +35,7 @@ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 
 public class YarnTaskManagerRunner {
 
-	private static final Log LOG = LogFactory.getLog(YarnTaskManagerRunner.class);
+	private static final Logger LOG = LoggerFactory.getLogger(YarnTaskManagerRunner.class);
 
 	public static void main(final String[] args) throws IOException {
 		Map<String, String> envs = System.getenv();
@@ -59,7 +59,7 @@ public class YarnTaskManagerRunner {
 				try {
 					TaskManager.main(newArgs);
 				} catch (Exception e) {
-					LOG.fatal("Error while running the TaskManager", e);
+					LOG.error("Error while running the TaskManager", e);
 				}
 				return null;
 			}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
index 52eda29..d637c7e 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
@@ -28,8 +28,6 @@ import org.apache.flink.api.common.PlanExecutor;
 import org.apache.flink.api.common.Program;
 import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.util.LogUtils;
-import org.apache.log4j.Level;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.client.minicluster.NepheleMiniCluster;
 import org.apache.flink.compiler.DataStatistics;
@@ -77,10 +75,6 @@ public class LocalExecutor extends PlanExecutor {
 		if (!ExecutionEnvironment.localExecutionIsAllowed()) {
 			throw new InvalidProgramException("The LocalEnvironment cannot be used when submitting a program through a client.");
 		}
-		
-		if (System.getProperty("log4j.configuration") == null) {
-			setLoggingLevel(Level.INFO);
-		}
 	}
 
 	public int getJobManagerRpcPort() {
@@ -339,14 +333,7 @@ public class LocalExecutor extends PlanExecutor {
 		List<DataSinkNode> sinks = PactCompiler.createPreOptimizedPlan(plan);
 		return gen.getPactPlanAsJSON(sinks);
 	}
-	
-	/**
-	 * Utility method for logging
-	 */
-	public static void setLoggingLevel(Level lvl) {
-		LogUtils.initializeDefaultConsoleLogger(lvl);
-	}
-	
+
 	/**
 	 * By default, local environments do not overwrite existing files.
 	 * 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-clients/src/main/java/org/apache/flink/client/WebFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/WebFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/WebFrontend.java
index 53c4dac..7334578 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/WebFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/WebFrontend.java
@@ -19,8 +19,8 @@
 
 package org.apache.flink.client;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.client.web.WebInterfaceServer;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -35,7 +35,7 @@ public class WebFrontend {
 	/**
 	 * The log for this class.
 	 */
-	private static final Log LOG = LogFactory.getLog(WebFrontend.class);
+	private static final Logger LOG = LoggerFactory.getLogger(WebFrontend.class);
 
 	/**
 	 * Main method. accepts a single parameter, which is the config directory.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java b/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
index 454c543..db52adc 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
@@ -21,8 +21,8 @@ package org.apache.flink.client.minicluster;
 
 import java.lang.reflect.Method;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.io.FileInputFormat;
 import org.apache.flink.api.common.io.FileOutputFormat;
 import org.apache.flink.configuration.ConfigConstants;
@@ -37,7 +37,7 @@ import org.apache.flink.runtime.jobmanager.JobManager;
 
 public class NepheleMiniCluster {
 	
-	private static final Log LOG = LogFactory.getLog(NepheleMiniCluster.class);
+	private static final Logger LOG = LoggerFactory.getLogger(NepheleMiniCluster.class);
 	
 	private static final int DEFAULT_JM_RPC_PORT = 6498;
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index a82b9d5..88ddc9d 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -27,8 +27,8 @@ import java.net.InetSocketAddress;
 import java.util.List;
 
 import org.apache.commons.lang3.StringEscapeUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.compiler.CompilerException;
@@ -58,7 +58,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
  */
 public class Client {
 	
-	private static final Log LOG = LogFactory.getLog(Client.class);
+	private static final Logger LOG = LoggerFactory.getLogger(Client.class);
 	
 	
 	private final Configuration configuration;	// the configuration describing the job manager address

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java b/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
index 844930c..52d22f0 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
@@ -35,8 +35,6 @@ import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.flink.client.program.Client;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.ProgramInvocationException;
@@ -45,6 +43,8 @@ import org.apache.flink.compiler.plan.OptimizedPlan;
 import org.apache.flink.compiler.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 public class JobSubmissionServlet extends HttpServlet {
@@ -74,7 +74,7 @@ public class JobSubmissionServlet extends HttpServlet {
 
 	private static final String SUSPEND_PARAM_NAME = "suspend";
 
-	private static final Log LOG = LogFactory.getLog(JobSubmissionServlet.class);
+	private static final Logger LOG = LoggerFactory.getLogger(JobSubmissionServlet.class);
 
 	// ------------------------------------------------------------------------
 
@@ -86,7 +86,7 @@ public class JobSubmissionServlet extends HttpServlet {
 
 	private final Random rand;							// random number generator for UID
 	
-	private final Configuration nepheleConfig;
+	private final Configuration nepheleConfig;
 
 
 	public JobSubmissionServlet(Configuration nepheleConfig, File jobDir, File planDir) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-clients/src/main/java/org/apache/flink/client/web/PactJobJSONServlet.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/web/PactJobJSONServlet.java b/flink-clients/src/main/java/org/apache/flink/client/web/PactJobJSONServlet.java
index 1ca7408..570f222 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/web/PactJobJSONServlet.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/web/PactJobJSONServlet.java
@@ -28,8 +28,8 @@ import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.client.program.PackagedProgram;
 
 
@@ -38,7 +38,7 @@ public class PactJobJSONServlet extends HttpServlet {
 	/** Serial UID for serialization interoperability. */
 	private static final long serialVersionUID = 558077298726449201L;
 	
-	private static final Log LOG = LogFactory.getLog(PactJobJSONServlet.class);
+	private static final Logger LOG = LoggerFactory.getLogger(PactJobJSONServlet.class);
 
 	// ------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-clients/src/main/java/org/apache/flink/client/web/WebInterfaceServer.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/web/WebInterfaceServer.java b/flink-clients/src/main/java/org/apache/flink/client/web/WebInterfaceServer.java
index a7da0f9..af1725c 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/web/WebInterfaceServer.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/web/WebInterfaceServer.java
@@ -23,8 +23,8 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
@@ -48,7 +48,7 @@ public class WebInterfaceServer {
 	/**
 	 * The log for this class.
 	 */
-	private static final Log LOG = LogFactory.getLog(WebInterfaceServer.class);
+	private static final Logger LOG = LoggerFactory.getLogger(WebInterfaceServer.class);
 
 	/**
 	 * The jetty server serving all requests.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
index de5a3e1..7ba5dcc 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
@@ -28,8 +28,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.operators.Operator;
@@ -321,7 +321,7 @@ public class PactCompiler {
 	/**
 	 * The log handle that is used by the compiler to log messages.
 	 */
-	public static final Log LOG = LogFactory.getLog(PactCompiler.class);
+	public static final Logger LOG = LoggerFactory.getLogger(PactCompiler.class);
 
 	// ------------------------------------------------------------------------
 	// Members

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
index fcf9f8d..043a0a7 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
@@ -103,7 +103,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 	
 	private static final boolean mergeIterationAuxTasks = GlobalConfiguration.getBoolean(MERGE_ITERATION_AUX_TASKS_KEY, false);
 	
-//	private static final Log LOG = LogFactory.getLog(NepheleJobGraphGenerator.class);
+//	private static final Logger LOG = LoggerFactory.getLogger(NepheleJobGraphGenerator.class);
 	
 	private static final TaskInChain ALREADY_VISITED_PLACEHOLDER = new TaskInChain(null, null, null);
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-compiler/src/test/java/org/apache/flink/compiler/CompilerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/CompilerTestBase.java b/flink-compiler/src/test/java/org/apache/flink/compiler/CompilerTestBase.java
index 0115c31..a13e4a8 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/CompilerTestBase.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/CompilerTestBase.java
@@ -37,11 +37,9 @@ import org.apache.flink.compiler.costs.DefaultCostEstimator;
 import org.apache.flink.compiler.plan.OptimizedPlan;
 import org.apache.flink.compiler.plan.PlanNode;
 import org.apache.flink.compiler.plan.SingleInputPlanNode;
-import org.apache.flink.util.LogUtils;
 import org.apache.flink.util.OperatingSystem;
 import org.apache.flink.util.Visitor;
 import org.junit.Before;
-import org.junit.BeforeClass;
 
 /**
  *
@@ -72,11 +70,6 @@ public abstract class CompilerTestBase implements java.io.Serializable {
 	
 	// ------------------------------------------------------------------------	
 	
-	@BeforeClass
-	public static void initialize() {
-		LogUtils.initializeDefaultTestConsoleLogger();
-	}
-	
 	@Before
 	public void setup() {
 		this.dataStats = new DataStatistics();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-compiler/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/resources/logback-test.xml b/flink-compiler/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..8b3bb27
--- /dev/null
+++ b/flink-compiler/src/test/resources/logback-test.xml
@@ -0,0 +1,29 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index cd338c9..e0c0a2f 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -33,7 +33,7 @@ under the License.
 	<name>flink-core</name>
 
 	<packaging>jar</packaging>
-	
+
 	<build>
 		<plugins>
 			<plugin>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
index ec63da7..4216f20 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
@@ -25,8 +25,8 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.BlockLocation;
@@ -50,7 +50,7 @@ public abstract class BinaryInputFormat<T extends IOReadableWritable> extends Fi
 	/**
 	 * The log.
 	 */
-	private static final Log LOG = LogFactory.getLog(BinaryInputFormat.class);
+	private static final Logger LOG = LoggerFactory.getLogger(BinaryInputFormat.class);
 
 	/**
 	 * The config parameter which defines the fixed length of a record.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
index b6ad8b3..63a394f 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
@@ -25,8 +25,8 @@ import java.nio.charset.IllegalCharsetNameException;
 import java.nio.charset.UnsupportedCharsetException;
 import java.util.ArrayList;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.api.common.operators.base.FileDataSourceBase;
 import org.apache.flink.configuration.ConfigConstants;
@@ -55,7 +55,7 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> {
 	/**
 	 * The log.
 	 */
-	private static final Log LOG = LogFactory.getLog(DelimitedInputFormat.class);
+	private static final Logger LOG = LoggerFactory.getLogger(DelimitedInputFormat.class);
 	
 	/**
 	 * The default read buffer size = 1MB.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
index 9a674a2..ccd6c51 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
@@ -26,8 +26,8 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.api.common.operators.base.GenericDataSourceBase;
 import org.apache.flink.configuration.ConfigConstants;
@@ -53,7 +53,7 @@ public abstract class FileInputFormat<OT> implements InputFormat<OT, FileInputSp
 	
 	// -------------------------------------- Constants -------------------------------------------
 	
-	private static final Log LOG = LogFactory.getLog(FileInputFormat.class);
+	private static final Logger LOG = LoggerFactory.getLogger(FileInputFormat.class);
 	
 	private static final long serialVersionUID = 1L;
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
index ad771be..4331116 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
@@ -21,8 +21,8 @@ package org.apache.flink.api.common.io;
 
 import java.io.IOException;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.operators.base.FileDataSinkBase;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -83,7 +83,7 @@ public abstract class FileOutputFormat<IT> implements OutputFormat<IT>, Initiali
 	/**
 	 * The LOG for logging messages in this class.
 	 */
-	private static final Log LOG = LogFactory.getLog(FileOutputFormat.class);
+	private static final Logger LOG = LoggerFactory.getLogger(FileOutputFormat.class);
 	
 	/**
 	 * The key under which the name of the target path is stored in the configuration. 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java
index c91e6ab..81c6679 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java
@@ -23,8 +23,8 @@ import java.io.Serializable;
 import java.util.Collections;
 import java.util.Map;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.api.common.aggregators.AggregatorRegistry;
@@ -281,7 +281,7 @@ public class BulkIterationBase<T> extends SingleInputOperator<T, T, AbstractRich
 	@SuppressWarnings("serial")
 	public static class TerminationCriterionAggregationConvergence implements ConvergenceCriterion<LongValue> {
 
-		private static final Log log = LogFactory.getLog(TerminationCriterionAggregationConvergence.class);
+		private static final Logger log = LoggerFactory.getLogger(TerminationCriterionAggregationConvergence.class);
 
 		@Override
 		public boolean isConverged(int iteration, LongValue countAggregate) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
index c9de712..648b9b5 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
@@ -33,8 +33,8 @@ import javax.xml.parsers.DocumentBuilder;
 import javax.xml.parsers.DocumentBuilderFactory;
 import javax.xml.parsers.ParserConfigurationException;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.util.StringUtils;
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
@@ -54,7 +54,7 @@ public final class GlobalConfiguration {
 	/**
 	 * The log object used for debugging.
 	 */
-	private static final Log LOG = LogFactory.getLog(GlobalConfiguration.class);
+	private static final Logger LOG = LoggerFactory.getLogger(GlobalConfiguration.class);
 
 	/**
 	 * The global configuration object accessible through a singleton pattern.
@@ -460,7 +460,6 @@ public final class GlobalConfiguration {
 			}
 
 			if (!"configuration".equals(root.getNodeName())) {
-				LOG.warn("Cannot load configuration: unknown element " + root.getNodeName());
 				return;
 			}
 
@@ -481,14 +480,11 @@ public final class GlobalConfiguration {
 					}
 
 					if (!(propNode instanceof Element)) {
-						LOG.warn("Error while reading configuration: " + propNode.getNodeName()
-							+ " is not of type element");
 						continue;
 					}
 
 					Element property = (Element) propNode;
 					if (!"property".equals(property.getNodeName())) {
-						LOG.warn("Error while reading configuration: unknown element " + property.getNodeName());
 						continue;
 					}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
index 2289091..2b2b5e0 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
@@ -32,8 +32,8 @@ import java.net.InetAddress;
 import java.net.URI;
 import java.net.UnknownHostException;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.core.fs.BlockLocation;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
@@ -64,7 +64,7 @@ public class LocalFileSystem extends FileSystem {
 	 */
 	private final String hostName;
 
-	private static final Log LOG = LogFactory.getLog(LocalFileSystem.class);
+	private static final Logger LOG = LoggerFactory.getLogger(LocalFileSystem.class);
 
 	/**
 	 * Constructs a new <code>LocalFileSystem</code> object.
@@ -77,7 +77,7 @@ public class LocalFileSystem extends FileSystem {
 		try {
 			tmp = InetAddress.getLocalHost().getHostName();
 		} catch (UnknownHostException e) {
-			LOG.error(e);
+			LOG.error("Could not resolve local host", e);
 		}
 
 		this.hostName = tmp;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-core/src/main/java/org/apache/flink/util/LogUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/LogUtils.java b/flink-core/src/main/java/org/apache/flink/util/LogUtils.java
deleted file mode 100644
index be8c061..0000000
--- a/flink-core/src/main/java/org/apache/flink/util/LogUtils.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.util;
-
-import org.apache.log4j.ConsoleAppender;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.apache.log4j.PatternLayout;
-
-
-public class LogUtils {
-
-	public static void initializeDefaultConsoleLogger() {
-		initializeDefaultConsoleLogger(Level.INFO);
-	}
-	
-	public static void initializeDefaultTestConsoleLogger() {
-		initializeDefaultConsoleLogger(Level.WARN);
-	}
-	
-	public static void initializeDefaultConsoleLogger(Level logLevel) {
-		Logger root = Logger.getRootLogger();
-		root.removeAllAppenders();
-		PatternLayout layout = new PatternLayout("%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n");
-		ConsoleAppender appender = new ConsoleAppender(layout, "System.err");
-		root.addAppender(appender);
-		root.setLevel(logLevel);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java
index 889f2b4..6418ce7 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java
@@ -23,16 +23,11 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 
-import org.apache.flink.api.common.io.BinaryInputFormat;
-import org.apache.flink.api.common.io.BlockInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.types.Record;
-import org.apache.flink.util.LogUtils;
-import org.apache.log4j.Level;
 import org.junit.Assert;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class BinaryInputFormatTest {
@@ -47,11 +42,6 @@ public class BinaryInputFormatTest {
 		}
 	}
 	
-	@BeforeClass
-	public static void initialize() {
-		LogUtils.initializeDefaultConsoleLogger(Level.WARN);
-	}
-
 	@Test
 	public void testCreateInputSplitsWithOneFile() throws IOException {
 		// create temporary file with 3 blocks

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatSamplingTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatSamplingTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatSamplingTest.java
index 632acd4..9512a73 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatSamplingTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatSamplingTest.java
@@ -19,7 +19,6 @@
 
 package org.apache.flink.api.common.io;
 
-import org.apache.flink.api.common.io.DelimitedInputFormat;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -27,8 +26,6 @@ import org.apache.flink.testutils.TestConfigUtils;
 import org.apache.flink.testutils.TestFileSystem;
 import org.apache.flink.testutils.TestFileUtils;
 import org.apache.flink.types.IntValue;
-import org.apache.flink.util.LogUtils;
-import org.apache.log4j.Level;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -75,8 +72,6 @@ public class DelimitedInputFormatSamplingTest {
 	
 	@BeforeClass
 	public static void initialize() {
-		LogUtils.initializeDefaultConsoleLogger(Level.ERROR);
-		
 		try {
 			TestFileSystem.registerTestFileSysten();
 		} catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
index f01e025..7b8e58b 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
@@ -34,17 +34,13 @@ import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.OutputStreamWriter;
 
-import org.apache.flink.api.common.io.DelimitedInputFormat;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.types.Record;
 import org.apache.flink.types.StringValue;
-import org.apache.flink.util.LogUtils;
-import org.apache.log4j.Level;
 import org.junit.After;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class DelimitedInputFormatTest {
@@ -56,12 +52,7 @@ public class DelimitedInputFormatTest {
 	private final DelimitedInputFormat<Record> format = new MyTextInputFormat();
 	
 	// --------------------------------------------------------------------------------------------
-	
-	@BeforeClass
-	public static void initialize() {
-		LogUtils.initializeDefaultConsoleLogger(Level.WARN);
-	}
-	
+
 	@Before
 	public void setup() {
 		this.format.setFilePath(new Path("file:///some/file/that/will/not/be/read"));

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
index 0c12456..9e93e67 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
@@ -24,26 +24,18 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.net.URI;
 
-import org.apache.flink.api.common.io.FileInputFormat;
 import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.testutils.TestFileUtils;
 import org.apache.flink.types.IntValue;
-import org.apache.flink.util.LogUtils;
-import org.apache.log4j.Level;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class FileInputFormatTest { 
 
-	@BeforeClass
-	public static void initialize() {
-		LogUtils.initializeDefaultConsoleLogger(Level.ERROR);
-	}
-	
 	@Test
 	public void testGetStatisticsNonExistingFile() {
 		try {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-core/src/test/java/org/apache/flink/api/common/io/FileOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/FileOutputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/FileOutputFormatTest.java
index fd09c07..89e77d8 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/FileOutputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/FileOutputFormatTest.java
@@ -24,23 +24,15 @@ import java.io.IOException;
 
 import org.junit.Assert;
 
-import org.apache.flink.api.common.io.FileOutputFormat;
 import org.apache.flink.api.common.io.FileOutputFormat.OutputDirectoryMode;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.types.IntValue;
-import org.apache.flink.util.LogUtils;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class FileOutputFormatTest {
 
-	@BeforeClass
-	public static void initialize() {
-		LogUtils.initializeDefaultTestConsoleLogger();
-	}
-	
 	@Test
 	public void testCreateNoneParallelLocalFS() {
 		

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
index d77901b..3534c84 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
@@ -31,8 +31,6 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.Arrays;
 
-import org.apache.flink.api.common.io.GenericCsvInputFormat;
-import org.apache.flink.api.common.io.ParseException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
@@ -41,11 +39,8 @@ import org.apache.flink.types.IntValue;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.StringValue;
 import org.apache.flink.types.Value;
-import org.apache.flink.util.LogUtils;
-import org.apache.log4j.Level;
 import org.junit.After;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class GenericCsvInputFormatTest {
@@ -56,11 +51,6 @@ public class GenericCsvInputFormatTest {
 	
 	// --------------------------------------------------------------------------------------------
 
-	@BeforeClass
-	public static void initialize() {
-		LogUtils.initializeDefaultConsoleLogger(Level.WARN);
-	}
-	
 	@Before
 	public void setup() {
 		format = new TestCsvInputFormat();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-core/src/test/java/org/apache/flink/api/common/io/SequentialFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/SequentialFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/SequentialFormatTest.java
index eaf1c09..85003e5 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/SequentialFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/SequentialFormatTest.java
@@ -27,12 +27,6 @@ import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
 
-import org.apache.flink.api.common.io.BinaryInputFormat;
-import org.apache.flink.api.common.io.BinaryOutputFormat;
-import org.apache.flink.api.common.io.BlockInfo;
-import org.apache.flink.api.common.io.FormatUtil;
-import org.apache.flink.api.common.io.SerializedInputFormat;
-import org.apache.flink.api.common.io.SerializedOutputFormat;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileInputSplit;
@@ -42,12 +36,9 @@ import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.Record;
 import org.apache.flink.types.StringValue;
-import org.apache.flink.util.LogUtils;
-import org.apache.log4j.Level;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -79,11 +70,6 @@ public class SequentialFormatTest {
 
 	private File tempFile;
 
-	@BeforeClass
-	public static void initialize() {
-		LogUtils.initializeDefaultConsoleLogger(Level.WARN);
-	}
-	
 	/**
 	 * Initializes SequentialFormatTest.
 	 */

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestInstance.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestInstance.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestInstance.java
index c8fc301..d60da17 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestInstance.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestInstance.java
@@ -22,7 +22,8 @@ package org.apache.flink.api.common.typeutils;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 
 
-public class SerializerTestInstance<T> extends SerializerTestBase<T> {
+public class
+		SerializerTestInstance<T> extends SerializerTestBase<T> {
 
 	private final TypeSerializer<T> serializer;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java
index 8d60854..6aa8a73 100644
--- a/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java
@@ -27,13 +27,8 @@ import java.io.FileNotFoundException;
 import java.io.PrintWriter;
 import java.lang.reflect.Field;
 
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.core.testutils.CommonTestUtils;
-import org.apache.flink.util.LogUtils;
-import org.apache.log4j.Level;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 /**
@@ -41,11 +36,6 @@ import org.junit.Test;
  */
 public class GlobalConfigurationTest {
 
-	@BeforeClass
-	public static void initLogging() {
-		LogUtils.initializeDefaultConsoleLogger(Level.OFF);
-	}
-	
 	@Before
 	public void resetSingleton() throws SecurityException, NoSuchFieldException, IllegalArgumentException,
 			IllegalAccessException {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-core/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-core/src/test/resources/logback-test.xml b/flink-core/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..8b3bb27
--- /dev/null
+++ b/flink-core/src/test/resources/logback-test.xml
@@ -0,0 +1,29 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-dist/src/main/assemblies/yarn.xml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/assemblies/yarn.xml b/flink-dist/src/main/assemblies/yarn.xml
index f262ab0..71d2815 100644
--- a/flink-dist/src/main/assemblies/yarn.xml
+++ b/flink-dist/src/main/assemblies/yarn.xml
@@ -114,7 +114,7 @@ under the License.
 			<fileMode>0644</fileMode>
 		</file>
 		<file>
-			<source>src/main/flink-bin/conf/log4j.properties</source>
+			<source>src/main/flink-bin/conf/logback.xml</source>
 			<outputDirectory>conf</outputDirectory>
 			<fileMode>0644</fileMode>
 		</file>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-dist/src/main/flink-bin/LICENSE
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/LICENSE b/flink-dist/src/main/flink-bin/LICENSE
index ebe9502..78e8a3e 100644
--- a/flink-dist/src/main/flink-bin/LICENSE
+++ b/flink-dist/src/main/flink-bin/LICENSE
@@ -224,7 +224,6 @@ under the Apache License (v 2.0):
  - Apache Commons FileUpload (http://commons.apache.org/fileupload/)
  - Apache Commons IO (http://commons.apache.org/io/)
  - Apache Commons Math (http://commons.apache.org/proper/commons-math/)
- - Apache Log4J (http://logging.apache.org/log4j/1.2/)
  - Apache Avro (http://avro.apache.org)
  - Apache Hadoop (http://hadoop.apache.org)
  - Apache Derby (http://db.apache.org/derby/)
@@ -346,6 +345,19 @@ http://www.mozilla.org/MPL/
 
 
 -----------------------------------------------------------------------
+ Eclipse Public License  - v 1.0
+-----------------------------------------------------------------------
+
+The Apache Flink project depends on and/or bundles the following components
+under the Eclipse Public License (v 1.0)
+
+- LOGback (http://logback.qos.ch)
+    Copyright (C) 1999-2012, QOS.ch. All rights reserved.
+
+You may obtain a copy of the Eclipse Public License (v 1.0) at
+https://www.eclipse.org/legal/epl-v10.html
+
+-----------------------------------------------------------------------
  The Open Font License
 -----------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-dist/src/main/flink-bin/NOTICE
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/NOTICE b/flink-dist/src/main/flink-bin/NOTICE
index 67c621d..b43d671 100644
--- a/flink-dist/src/main/flink-bin/NOTICE
+++ b/flink-dist/src/main/flink-bin/NOTICE
@@ -33,17 +33,20 @@ with permission from the original authors.
 Original source copyright:
 Copyright (c) 2008 Alexander Beider & Stephen P. Morse.
 
-
 -----------------------------------------------------------------------
-                          Apache Log4J
+                          LOGBack
 -----------------------------------------------------------------------
 
-ResolverUtil.java
-Copyright 2005-2006 Tim Fennell
+Copyright (C) 1999-2012, QOS.ch
+
+This program and the accompanying materials are dual-licensed under
+either the terms of the Eclipse Public License v1.0 as published by
+the Eclipse Foundation
 
-Dumbster SMTP test server
-Copyright 2004 Jason Paul Kitchen
+  or (per the licensee's choosing)
 
+under the terms of the GNU Lesser General Public License version 2.1
+as published by the Free Software Foundation.
 
 -----------------------------------------------------------------------
                        Apache Commmons Lang3

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-dist/src/main/flink-bin/bin/flink
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/flink b/flink-dist/src/main/flink-bin/bin/flink
index 99b7a77..e33619b 100755
--- a/flink-dist/src/main/flink-bin/bin/flink
+++ b/flink-dist/src/main/flink-bin/bin/flink
@@ -52,7 +52,7 @@ constructCLIClientClassPath() {
 CC_CLASSPATH=`manglePathList $(constructCLIClientClassPath)`
 
 log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-flink-client-$HOSTNAME.log
-log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j.properties"
+log_setting="-Dlog.file="$log" -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml"
 
 export FLINK_CONF_DIR
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-dist/src/main/flink-bin/bin/jobmanager.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/jobmanager.sh b/flink-dist/src/main/flink-bin/bin/jobmanager.sh
index c2f297c..f8f2ec8 100755
--- a/flink-dist/src/main/flink-bin/bin/jobmanager.sh
+++ b/flink-dist/src/main/flink-bin/bin/jobmanager.sh
@@ -58,7 +58,7 @@ FLINK_JM_CLASSPATH=`manglePathList "$(constructJobManagerClassPath)"`
 log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-jobmanager-$HOSTNAME.log
 out=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-jobmanager-$HOSTNAME.out
 pid=$FLINK_PID_DIR/flink-$FLINK_IDENT_STRING-jobmanager.pid
-log_setting=(-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j.properties)
+log_setting=(-Dlog.file="$log" -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml)
 
 case $STARTSTOP in
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-dist/src/main/flink-bin/bin/start-local.bat
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/start-local.bat b/flink-dist/src/main/flink-bin/bin/start-local.bat
index e77b40c..8089686 100644
--- a/flink-dist/src/main/flink-bin/bin/start-local.bat
+++ b/flink-dist/src/main/flink-bin/bin/start-local.bat
@@ -20,24 +20,24 @@
 setlocal EnableDelayedExpansion
 
 SET bin=%~dp0
-SET NEPHELE_ROOT_DIR=%bin%..
-SET NEPHELE_LIB_DIR=%NEPHELE_ROOT_DIR%\lib
-SET NEPHELE_CONF_DIR=%NEPHELE_ROOT_DIR%\conf
-SET NEPHELE_LOG_DIR=%NEPHELE_ROOT_DIR%\log
+SET FLINK_ROOT_DIR=%bin%..
+SET FLINK_LIB_DIR=%FLINK_ROOT_DIR%\lib
+SET FLINK_CONF_DIR=%FLINK_ROOT_DIR%\conf
+SET FLINK_LOG_DIR=%FLINK_ROOT_DIR%\log
 
 SET JVM_ARGS=-Xms768m -Xmx768m
 
-SET NEPHELE_JM_CLASSPATH=%NEPHELE_LIB_DIR%\*
+SET FLINK_JM_CLASSPATH=%FLINK_LIB_DIR%\*
 
-SET logname=nephele-%username%-jobmanager-%computername%.log
-SET log=%NEPHELE_LOG_DIR%\%logname%
-SET outname=nephele-%username%-jobmanager-%computername%.out
-SET out=%NEPHELE_LOG_DIR%\%outname%
-SET log_setting=-Dlog.file=%log% -Dlog4j.configuration=file:%NEPHELE_CONF_DIR%/log4j.properties
+SET logname=flink-%username%-jobmanager-%computername%.log
+SET log=%FLINK_LOG_DIR%\%logname%
+SET outname=flink-%username%-jobmanager-%computername%.out
+SET out=%FLINK_LOG_DIR%\%outname%
+SET log_setting=-Dlog.file=%log% -Dlogback.configurationFile=file:%FLINK_CONF_DIR%/logback.xml
 
 
 :: Log rotation (quick and dirty)
-CD %NEPHELE_LOG_DIR%
+CD %FLINK_LOG_DIR%
 for /l %%x in (5, -1, 1) do ( 
 SET /A y = %%x+1 
 RENAME "%logname%.%%x" "%logname%.!y!" 2> nul
@@ -57,6 +57,6 @@ if not defined FOUND (
 echo Starting Flink job manager. Webinterface by default on http://localhost:8081/.
 echo Don't close this batch window. Stop job manager by pressing Ctrl+C.
 
-java %JVM_ARGS% %log_setting% -cp %NEPHELE_JM_CLASSPATH% org.apache.flink.runtime.jobmanager.JobManager -executionMode local -configDir %NEPHELE_CONF_DIR%  > "%out%"  2>&1
+java %JVM_ARGS% %log_setting% -cp %FLINK_JM_CLASSPATH% org.apache.flink.runtime.jobmanager.JobManager -executionMode local -configDir %FLINK_CONF_DIR%  > "%out%"  2>&1
 
 endlocal

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-dist/src/main/flink-bin/bin/taskmanager.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/taskmanager.sh b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
index 91259b3..a53325e 100755
--- a/flink-dist/src/main/flink-bin/bin/taskmanager.sh
+++ b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
@@ -49,7 +49,7 @@ FLINK_TM_CLASSPATH=`manglePathList "$(constructTaskManagerClassPath)"`
 log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-taskmanager-$HOSTNAME.log
 out=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-taskmanager-$HOSTNAME.out
 pid=$FLINK_PID_DIR/flink-$FLINK_IDENT_STRING-taskmanager.pid
-log_setting=(-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j.properties)
+log_setting=(-Dlog.file="$log" -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml)
 
 JVM_ARGS="$JVM_ARGS -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=256m -XX:NewRatio=6"
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-dist/src/main/flink-bin/bin/webclient.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/webclient.sh b/flink-dist/src/main/flink-bin/bin/webclient.sh
index cb8c6b3..c7dd2ab 100755
--- a/flink-dist/src/main/flink-bin/bin/webclient.sh
+++ b/flink-dist/src/main/flink-bin/bin/webclient.sh
@@ -57,7 +57,7 @@ FLINK_WEBCLIENT_CLASSPATH=`manglePathList "$(constructWebclientClassPath)"`
 log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-webclient-$HOSTNAME.log
 out=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-webclient-$HOSTNAME.out
 pid=$FLINK_PID_DIR/flink-$FLINK_IDENT_STRING-webclient.pid
-log_setting=(-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j.properties)
+log_setting=(-Dlog.file="$log" -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml)
 
 case $STARTSTOP in
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-dist/src/main/flink-bin/conf/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/conf/log4j.properties b/flink-dist/src/main/flink-bin/conf/log4j.properties
deleted file mode 100644
index 0857ce0..0000000
--- a/flink-dist/src/main/flink-bin/conf/log4j.properties
+++ /dev/null
@@ -1,27 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-log4j.rootLogger=INFO, file
-
-# Log all infos in the given file
-log4j.appender.file=org.apache.log4j.FileAppender
-log4j.appender.file.file=${log.file}
-log4j.appender.file.append=false
-log4j.appender.file.layout=org.apache.log4j.PatternLayout
-log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
-

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-dist/src/main/flink-bin/conf/log4jconsole.properties
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/conf/log4jconsole.properties b/flink-dist/src/main/flink-bin/conf/log4jconsole.properties
deleted file mode 100644
index 020cbeb..0000000
--- a/flink-dist/src/main/flink-bin/conf/log4jconsole.properties
+++ /dev/null
@@ -1,25 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-log4j.rootLogger=INFO, console
-
-# Log all infos in the given file
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
-

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-dist/src/main/flink-bin/conf/logback.xml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/conf/logback.xml b/flink-dist/src/main/flink-bin/conf/logback.xml
new file mode 100644
index 0000000..9305b11
--- /dev/null
+++ b/flink-dist/src/main/flink-bin/conf/logback.xml
@@ -0,0 +1,31 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="file" class="ch.qos.logback.core.FileAppender">
+        <file>${log.file}</file>
+        <append>false</append>
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="INFO">
+        <appender-ref ref="file"/>
+    </root>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-dist/src/main/flink-bin/conf/logbackConsole.xml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/conf/logbackConsole.xml b/flink-dist/src/main/flink-bin/conf/logbackConsole.xml
new file mode 100644
index 0000000..807f40c
--- /dev/null
+++ b/flink-dist/src/main/flink-bin/conf/logbackConsole.xml
@@ -0,0 +1,30 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="INFO">
+        <appender-ref ref="STDOUT"/>
+    </root>
+
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh b/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh
index 31b662e..44a80c5 100644
--- a/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh
+++ b/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh
@@ -48,7 +48,7 @@ constructCLIClientClassPath() {
 CC_CLASSPATH=`manglePathList $(constructCLIClientClassPath)`
 
 #log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-yarn-session-$HOSTNAME.log
-#log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j.properties"
+#log_setting="-Dlog.file="$log" -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml"
 
 export FLINK_CONF_DIR
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
index 10fe3ce..c74fc96 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
@@ -22,9 +22,6 @@ import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.PlanExecutor;
-import org.apache.flink.util.LogUtils;
-import org.apache.log4j.Level;
-
 
 /**
  * An {@link ExecutionEnvironment} that runs the program locally, multi-threaded, in the JVM where the
@@ -35,9 +32,6 @@ import org.apache.log4j.Level;
  * machine.
  */
 public class LocalEnvironment extends ExecutionEnvironment {
-	
-	private boolean logging = false;
-
 	/**
 	 * Creates a new local environment.
 	 */
@@ -54,7 +48,6 @@ public class LocalEnvironment extends ExecutionEnvironment {
 		Plan p = createProgramPlan(jobName);
 		
 		PlanExecutor executor = PlanExecutor.createLocalExecutor();
-		initLogging();
 		return executor.executePlan(p);
 	}
 	
@@ -63,41 +56,8 @@ public class LocalEnvironment extends ExecutionEnvironment {
 		Plan p = createProgramPlan();
 		
 		PlanExecutor executor = PlanExecutor.createLocalExecutor();
-		initLogging();
 		return executor.getOptimizerPlanAsJSON(p);
 	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Causes the local environment to print INFO level log messages to the standard error output.
-	 */
-	public void enableLogging() {
-		this.logging = true;
-	}
-	
-	/**
-	 * Completely disables logging during the execution of programs in the local environment.
-	 */
-	public void disableLogging() {
-		this.logging = false;
-	}
-
-	/**
-	 * Checks whether logging during the program execution is enabled or disabled.
-	 * <p>
-	 * By default, logging is turned off.
-	 * 
-	 * @return True, if logging is enabled, false otherwise.
-	 */
-	public boolean isLoggingEnabled() {
-		return this.logging;
-	}
-	
-	private void initLogging() {
-		LogUtils.initializeDefaultConsoleLogger(logging ? Level.INFO : Level.OFF);
-	}
-
 	// --------------------------------------------------------------------------------------------
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java
index 7bc7fc9..71d339c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java
@@ -24,8 +24,8 @@ import java.io.IOException;
 import java.io.OutputStreamWriter;
 import java.io.Writer;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.io.FileOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple;
@@ -43,7 +43,7 @@ public class CsvOutputFormat<T extends Tuple> extends FileOutputFormat<T> implem
 	private static final long serialVersionUID = 1L;
 
 	@SuppressWarnings("unused")
-	private static final Log LOG = LogFactory.getLog(CsvOutputFormat.class);
+	private static final Logger LOG = LoggerFactory.getLogger(CsvOutputFormat.class);
 
 	// --------------------------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvOutputFormat.java
index 6eae03c..b7a8931 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvOutputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvOutputFormat.java
@@ -24,8 +24,8 @@ import java.io.IOException;
 import java.io.OutputStreamWriter;
 import java.io.Writer;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.api.java.record.operators.FileDataSink;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.types.Record;
@@ -70,7 +70,7 @@ public class CsvOutputFormat extends FileOutputFormat {
 	public static final String LENIENT_PARSING = "output.record.lenient";
 
 	@SuppressWarnings("unused")
-	private static final Log LOG = LogFactory.getLog(CsvOutputFormat.class);
+	private static final Logger LOG = LoggerFactory.getLogger(CsvOutputFormat.class);
 
 	// --------------------------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-java/src/main/java/org/apache/flink/api/java/record/io/TextInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/io/TextInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/record/io/TextInputFormat.java
index 644a6ae..93d9661 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/io/TextInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/record/io/TextInputFormat.java
@@ -26,8 +26,8 @@ import java.nio.charset.Charset;
 import java.nio.charset.CharsetDecoder;
 import java.util.Arrays;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.types.Record;
 import org.apache.flink.types.StringValue;
@@ -45,7 +45,7 @@ public class TextInputFormat extends DelimitedInputFormat {
 	
 	public static final String DEFAULT_CHARSET_NAME = "UTF-8";
 	
-	private static final Log LOG = LogFactory.getLog(TextInputFormat.class);
+	private static final Logger LOG = LoggerFactory.getLogger(TextInputFormat.class);
 	
 	
 	protected final StringValue theString = new StringValue();