You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2014/09/05 11:57:58 UTC

[5/5] git commit: [FLINK-1086] Replace JCL with SLF4J and Log4j with LOGBack

[FLINK-1086] Replace JCL with SLF4J and Log4j with LOGBack

- Excluded Kafka's transitive dependencies: jmxtools and jmxri
- Corrected encoder pattern in logback.xml
- Removed explicit logging access. Loggers are now configured by
  configuration files. Fixed Yarn issue with multiple logging
  bindings.

This closes #111.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/08188508
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/08188508
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/08188508

Branch: refs/heads/master
Commit: 08188508d528c1072a746aacbf2a5c712d4f8467
Parents: 3aa5511
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Aug 28 12:36:53 2014 +0200
Committer: uce <u....@fu-berlin.de>
Committed: Fri Sep 5 11:57:02 2014 +0200

----------------------------------------------------------------------
 DEPENDENCIES                                    | 18 ++++--
 docs/internal_logging.md                        | 61 ++++++++++++++++++++
 docs/internal_overview.md                       |  1 +
 .../flink/api/java/io/AvroInputFormat.java      |  6 +-
 .../java/record/io/avro/AvroInputFormat.java    |  6 +-
 .../record/io/avro/AvroRecordInputFormat.java   |  6 +-
 .../api/avro/AvroExternalJarProgramITCase.java  |  5 --
 .../flink/api/avro/EncoderDecoderTest.java      |  4 --
 .../io/avro/AvroRecordInputFormatTest.java      |  4 +-
 .../src/test/resources/logback-test.xml         | 29 ++++++++++
 .../mapred/HadoopInputFormat.java               |  6 +-
 .../mapreduce/HadoopInputFormat.java            |  6 +-
 .../src/test/resources/logback-test.xml         | 29 ++++++++++
 .../flink/addons/hbase/TableInputFormat.java    |  6 +-
 .../flink/api/java/io/jdbc/JDBCInputFormat.java |  6 +-
 .../api/java/io/jdbc/JDBCOutputFormat.java      |  6 +-
 .../java/record/io/jdbc/JDBCInputFormat.java    |  6 +-
 .../api/java/io/jdbc/JDBCInputFormatTest.java   |  1 -
 .../api/java/io/jdbc/JDBCOutputFormatTest.java  |  2 -
 .../record/io/jdbc/JDBCOutputFormatTest.java    |  2 -
 .../src/test/resources/logback-test.xml         | 29 ++++++++++
 .../src/test/resources/logback-test.xml         | 29 ++++++++++
 .../flink-streaming-connectors/pom.xml          | 10 ++++
 .../streaming/connectors/flume/FlumeSink.java   |  6 +-
 .../streaming/connectors/rabbitmq/RMQSink.java  |  6 +-
 .../connectors/rabbitmq/RMQSource.java          | 14 ++---
 .../connectors/twitter/TwitterSource.java       |  6 +-
 .../src/test/resources/logback-test.xml         | 30 ++++++++++
 .../flink/streaming/api/JobGraphBuilder.java    |  6 +-
 .../api/collector/DirectedStreamCollector.java  | 26 ++++-----
 .../api/collector/StreamCollector.java          | 10 ++--
 .../environment/RemoteStreamEnvironment.java    |  6 +-
 .../api/invokable/StreamOperatorInvokable.java  |  6 +-
 .../streamcomponent/StreamIterationSink.java    |  6 +-
 .../streamcomponent/StreamIterationSource.java  |  6 +-
 .../api/streamcomponent/StreamSink.java         |  6 +-
 .../flink/streaming/util/ClusterUtil.java       |  6 +-
 .../apache/flink/streaming/util/LogUtils.java   | 48 ---------------
 .../flink/streaming/util/TestDataUtil.java      |  6 +-
 .../apache/flink/streaming/api/IterateTest.java |  3 -
 .../apache/flink/streaming/api/PrintTest.java   |  3 -
 .../flink/streaming/api/WriteAsCsvTest.java     |  4 --
 .../flink/streaming/api/WriteAsTextTest.java    |  4 --
 .../api/collector/DirectedOutputTest.java       |  3 -
 .../api/invokable/operator/CoFlatMapTest.java   |  4 --
 .../streamcomponent/StreamComponentTest.java    | 11 ----
 .../src/test/resources/logback-test.xml         | 30 ++++++++++
 .../main/java/org/apache/flink/yarn/Client.java | 59 +++++++------------
 .../apache/flink/yarn/ClientMasterControl.java  |  6 +-
 .../main/java/org/apache/flink/yarn/Utils.java  | 12 ++--
 .../flink/yarn/appMaster/ApplicationMaster.java | 23 ++++----
 .../yarn/appMaster/YarnTaskManagerRunner.java   |  8 +--
 .../org/apache/flink/client/LocalExecutor.java  | 15 +----
 .../org/apache/flink/client/WebFrontend.java    |  6 +-
 .../client/minicluster/NepheleMiniCluster.java  |  6 +-
 .../org/apache/flink/client/program/Client.java |  6 +-
 .../flink/client/web/JobSubmissionServlet.java  |  8 +--
 .../flink/client/web/PactJobJSONServlet.java    |  6 +-
 .../flink/client/web/WebInterfaceServer.java    |  6 +-
 .../org/apache/flink/compiler/PactCompiler.java |  6 +-
 .../plantranslate/NepheleJobGraphGenerator.java |  2 +-
 .../apache/flink/compiler/CompilerTestBase.java |  7 ---
 .../src/test/resources/logback-test.xml         | 29 ++++++++++
 flink-core/pom.xml                              |  2 +-
 .../flink/api/common/io/BinaryInputFormat.java  |  6 +-
 .../api/common/io/DelimitedInputFormat.java     |  6 +-
 .../flink/api/common/io/FileInputFormat.java    |  6 +-
 .../flink/api/common/io/FileOutputFormat.java   |  6 +-
 .../operators/base/BulkIterationBase.java       |  6 +-
 .../configuration/GlobalConfiguration.java      | 10 +---
 .../flink/core/fs/local/LocalFileSystem.java    |  8 +--
 .../java/org/apache/flink/util/LogUtils.java    | 45 ---------------
 .../api/common/io/BinaryInputFormatTest.java    | 10 ----
 .../io/DelimitedInputFormatSamplingTest.java    |  5 --
 .../api/common/io/DelimitedInputFormatTest.java | 11 +---
 .../api/common/io/FileInputFormatTest.java      |  8 ---
 .../api/common/io/FileOutputFormatTest.java     |  8 ---
 .../common/io/GenericCsvInputFormatTest.java    | 10 ----
 .../api/common/io/SequentialFormatTest.java     | 14 -----
 .../typeutils/SerializerTestInstance.java       |  3 +-
 .../configuration/GlobalConfigurationTest.java  | 10 ----
 flink-core/src/test/resources/logback-test.xml  | 29 ++++++++++
 flink-dist/src/main/assemblies/yarn.xml         |  2 +-
 flink-dist/src/main/flink-bin/LICENSE           | 14 ++++-
 flink-dist/src/main/flink-bin/NOTICE            | 15 +++--
 flink-dist/src/main/flink-bin/bin/flink         |  2 +-
 flink-dist/src/main/flink-bin/bin/jobmanager.sh |  2 +-
 .../src/main/flink-bin/bin/start-local.bat      | 24 ++++----
 .../src/main/flink-bin/bin/taskmanager.sh       |  2 +-
 flink-dist/src/main/flink-bin/bin/webclient.sh  |  2 +-
 .../src/main/flink-bin/conf/log4j.properties    | 27 ---------
 .../main/flink-bin/conf/log4jconsole.properties | 25 --------
 flink-dist/src/main/flink-bin/conf/logback.xml  | 31 ++++++++++
 .../src/main/flink-bin/conf/logbackConsole.xml  | 30 ++++++++++
 .../src/main/flink-bin/yarn-bin/yarn-session.sh |  2 +-
 .../apache/flink/api/java/LocalEnvironment.java | 40 -------------
 .../flink/api/java/io/CsvOutputFormat.java      |  6 +-
 .../api/java/record/io/CsvOutputFormat.java     |  6 +-
 .../api/java/record/io/TextInputFormat.java     |  6 +-
 .../flink/api/java/io/CsvInputFormatTest.java   | 11 +---
 .../flink/api/java/io/TextInputFormatTest.java  |  9 ---
 .../api/java/record/io/CsvInputFormatTest.java  | 10 ----
 .../record/io/FixedLenghtInputFormatTest.java   | 11 +---
 .../api/java/record/io/TextInputFormatTest.java | 10 ----
 .../apache/flink/runtime/client/JobClient.java  |  6 +-
 .../execution/ExecutionStateTransition.java     |  6 +-
 .../runtime/execution/RuntimeEnvironment.java   |  8 +--
 .../runtime/executiongraph/ExecutionGraph.java  |  6 +-
 .../executiongraph/ExecutionGraphIterator.java  |  6 +-
 .../executiongraph/ExecutionSignature.java      |  6 +-
 .../runtime/executiongraph/ExecutionVertex.java |  6 +-
 .../runtime/fs/hdfs/DistributedFileSystem.java  |  6 +-
 .../flink/runtime/fs/maprfs/MapRFileSystem.java |  8 +--
 .../flink/runtime/fs/s3/S3FileSystem.java       |  6 +-
 .../instance/DefaultInstanceManager.java        |  6 +-
 .../instance/HardwareDescriptionFactory.java    |  8 +--
 .../runtime/io/disk/iomanager/IOManager.java    |  8 +--
 .../runtime/io/network/ChannelManager.java      |  6 +-
 .../bufferprovider/GlobalBufferPool.java        |  6 +-
 .../io/network/channels/InputChannel.java       |  6 +-
 .../io/network/channels/OutputChannel.java      |  6 +-
 .../runtime/io/network/gates/InputGate.java     |  6 +-
 .../network/netty/InboundEnvelopeDecoder.java   |  6 +-
 .../network/netty/NettyConnectionManager.java   |  6 +-
 .../network/netty/OutboundConnectionQueue.java  |  6 +-
 .../org/apache/flink/runtime/ipc/Client.java    | 12 ++--
 .../java/org/apache/flink/runtime/ipc/RPC.java  |  6 +-
 .../org/apache/flink/runtime/ipc/Server.java    |  8 +--
 .../WorksetEmptyConvergenceCriterion.java       |  6 +-
 .../task/AbstractIterativePactTask.java         |  6 +-
 .../iterative/task/IterationHeadPactTask.java   |  6 +-
 .../task/IterationIntermediatePactTask.java     |  6 +-
 .../task/IterationSynchronizationSinkTask.java  |  6 +-
 .../iterative/task/IterationTailPactTask.java   |  6 +-
 .../iterative/task/SyncEventHandler.java        |  2 +-
 .../flink/runtime/jobmanager/JobManager.java    | 30 +++-------
 .../runtime/jobmanager/JobManagerUtils.java     |  6 +-
 .../jobmanager/scheduler/DefaultScheduler.java  | 10 ++--
 .../DefaultInputSplitAssigner.java              |  6 +-
 .../splitassigner/InputSplitManager.java        |  6 +-
 .../splitassigner/InputSplitTracker.java        |  6 +-
 .../LocatableInputSplitAssigner.java            |  6 +-
 .../splitassigner/LocatableInputSplitList.java  |  6 +-
 .../file/FileInputSplitAssigner.java            |  6 +-
 .../splitassigner/file/FileInputSplitList.java  |  6 +-
 .../jobmanager/web/JobmanagerInfoServlet.java   |  6 +-
 .../jobmanager/web/LogfileInfoServlet.java      | 17 +++---
 .../runtime/jobmanager/web/MenuServlet.java     |  8 +--
 .../jobmanager/web/SetupInfoServlet.java        |  6 +-
 .../runtime/jobmanager/web/WebInfoServer.java   |  6 +-
 .../ManagementGraphIterator.java                |  6 +-
 .../memorymanager/DefaultMemoryManager.java     |  8 +--
 .../org/apache/flink/runtime/net/NetUtils.java  |  8 +--
 .../flink/runtime/net/SocketIOWithTimeout.java  |  6 +-
 .../runtime/operators/AllGroupReduceDriver.java |  8 +--
 .../runtime/operators/AllReduceDriver.java      |  8 +--
 .../flink/runtime/operators/CoGroupDriver.java  |  6 +-
 .../flink/runtime/operators/CrossDriver.java    |  6 +-
 .../flink/runtime/operators/DataSinkTask.java   |  6 +-
 .../flink/runtime/operators/DataSourceTask.java |  6 +-
 .../operators/GroupReduceCombineDriver.java     |  6 +-
 .../runtime/operators/GroupReduceDriver.java    |  8 +--
 .../flink/runtime/operators/MatchDriver.java    |  6 +-
 .../runtime/operators/ReduceCombineDriver.java  |  6 +-
 .../flink/runtime/operators/ReduceDriver.java   |  6 +-
 .../runtime/operators/RegularPactTask.java      |  6 +-
 .../operators/hash/CompactingHashTable.java     |  6 +-
 .../operators/hash/MutableHashTable.java        |  6 +-
 .../AbstractBlockResettableIterator.java        |  6 +-
 .../resettable/BlockResettableIterator.java     |  6 +-
 .../BlockResettableMutableObjectIterator.java   |  6 +-
 .../resettable/SpillingResettableIterator.java  |  6 +-
 ...SpillingResettableMutableObjectIterator.java |  6 +-
 .../sort/CombiningUnilateralSortMerger.java     |  6 +-
 .../operators/sort/MergeMatchIterator.java      |  6 +-
 .../operators/sort/UnilateralSortMerger.java    |  6 +-
 .../flink/runtime/profiling/ProfilingUtils.java | 23 +++++---
 .../profiling/impl/EnvironmentListenerImpl.java |  6 +-
 .../profiling/impl/JobManagerProfilerImpl.java  |  6 +-
 .../profiling/impl/TaskManagerProfilerImpl.java |  8 +--
 .../apache/flink/runtime/taskmanager/Task.java  |  6 +-
 .../flink/runtime/taskmanager/TaskManager.java  | 26 ++++-----
 .../runtime/util/EnvironmentInformation.java    | 17 ++----
 .../org/apache/flink/runtime/util/IOUtils.java  |  4 +-
 .../apache/flink/runtime/AbstractIDTest.java    |  1 -
 .../flink/runtime/client/JobResultTest.java     |  5 +-
 .../ChannelDeploymentDescriptorTest.java        |  1 -
 .../GateDeploymentDescriptorTest.java           |  2 -
 .../TaskDeploymentDescriptorTest.java           |  3 -
 .../flink/runtime/event/job/JobEventTest.java   |  3 -
 .../runtime/event/job/ManagementEventTest.java  |  4 --
 .../task/EventNotificationManagerTest.java      |  6 --
 .../flink/runtime/event/task/TaskEventTest.java |  6 +-
 .../executiongraph/ExecutionGraphTest.java      | 16 -----
 .../FileCacheDeleteValidationTest.java          |  1 -
 .../apache/flink/runtime/fs/LineReaderTest.java |  2 +-
 .../instance/DefaultInstanceManagerTest.java    |  8 ---
 .../io/disk/iomanager/IOManagerITCase.java      | 14 ++---
 .../IOManagerPerformanceBenchmark.java          |  6 +-
 .../bufferprovider/LocalBufferPoolTest.java     |  8 ---
 .../runtime/jobmanager/JobManagerITCase.java    | 44 --------------
 .../runtime/operators/DataSinkTaskTest.java     | 19 +++---
 .../runtime/operators/DataSourceTaskTest.java   |  4 +-
 .../flink/runtime/operators/MapTaskTest.java    |  8 +--
 .../operators/ReduceTaskExternalITCase.java     | 14 ++---
 .../flink/runtime/operators/ReduceTaskTest.java | 14 ++---
 .../operators/chaining/ChainTaskTest.java       | 11 ----
 .../hash/HashFunctionCollisionBenchmark.java    | 12 ++--
 .../runtime/operators/hash/HashTableITCase.java |  4 +-
 .../hash/ReOpenableHashTableITCase.java         |  3 -
 .../CombiningUnilateralSortMergerITCase.java    | 15 +----
 .../operators/sort/ExternalSortITCase.java      |  8 +--
 .../sort/SortMergeCoGroupIteratorITCase.java    |  1 -
 .../operators/testutils/DriverTestBase.java     | 12 ----
 .../src/test/resources/logback-test.xml         | 36 ++++++++++++
 .../test/compiler/util/CompilerTestBase.java    |  2 +-
 .../flink/test/util/AbstractTestBase.java       |  4 --
 .../KMeansIterativeNepheleITCase.java           |  4 --
 .../test/cancelling/CancellingTestBase.java     | 17 ++----
 .../clients/examples/LocalExecutorITCase.java   |  2 -
 .../compiler/plandump/PreviewPlanDumpTest.java  |  2 +-
 .../DiffL1NormConvergenceCriterion.java         |  6 +-
 .../PackagedProgramEndToEndITCase.java          |  5 --
 .../apache/flink/test/operators/JoinITCase.java |  6 +-
 .../apache/flink/test/operators/MapITCase.java  |  6 +-
 .../flink/test/operators/UnionITCase.java       |  6 +-
 .../operators/io/ContractITCaseIOFormats.java   |  6 +-
 .../DiffL1NormConvergenceCriterion.java         |  6 +-
 .../test/recordJobs/relational/TPCHQuery4.java  |  7 ++-
 .../test/recordJobs/relational/TPCHQuery9.java  |  7 ++-
 .../relational/query1Util/LineItemFilter.java   |  9 +--
 .../query1Util/LineItemFilterTest.java          |  2 +-
 .../test/runtime/NetworkStackThroughput.java    |  9 +--
 .../apache/flink/test/util/FailingTestBase.java |  9 ---
 flink-tests/src/test/resources/logback-test.xml | 34 +++++++++++
 pom.xml                                         | 35 ++++-------
 236 files changed, 1068 insertions(+), 1227 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/DEPENDENCIES
----------------------------------------------------------------------
diff --git a/DEPENDENCIES b/DEPENDENCIES
index 118a48e..5cf024e 100644
--- a/DEPENDENCIES
+++ b/DEPENDENCIES
@@ -17,7 +17,6 @@ under the Apache License (v 2.0):
  - Apache Commons FileUpload (http://commons.apache.org/fileupload/)
  - Apache Commons IO (http://commons.apache.org/io/)
  - Apache Commons Math (http://commons.apache.org/proper/commons-math/)
- - Apache Log4J (http://logging.apache.org/log4j/1.2/)
  - Apache Avro (http://avro.apache.org)
  - Apache Hadoop (http://hadoop.apache.org)
  - Apache Derby (http://db.apache.org/derby/)
@@ -43,6 +42,8 @@ The Apache Flink project depends on and/or bundles the following components
 under the Eclipse Public License (v 1.0)
 
  - JUnit (http://junit.org/)
+ - LOGback (http://logback.qos.ch)
+     Copyright (C) 1999-2012, QOS.ch. All rights reserved.
  
 You may obtain a copy of the Eclipse Public License (v 1.0) at
 https://www.eclipse.org/legal/epl-v10.html
@@ -188,14 +189,19 @@ Copyright (c) 2008 Alexander Beider & Stephen P. Morse.
 
 
 -----------------------------------------------------------------------
-                          Apache Log4J
+                          LOGBack
 -----------------------------------------------------------------------
 
-ResolverUtil.java
-Copyright 2005-2006 Tim Fennell
+Copyright (C) 1999-2012, QOS.ch
 
-Dumbster SMTP test server
-Copyright 2004 Jason Paul Kitchen
+This program and the accompanying materials are dual-licensed under
+either the terms of the Eclipse Public License v1.0 as published by
+the Eclipse Foundation
+
+  or (per the licensee's choosing)
+
+under the terms of the GNU Lesser General Public License version 2.1
+as published by the Free Software Foundation.
 
 
 -----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/docs/internal_logging.md
----------------------------------------------------------------------
diff --git a/docs/internal_logging.md b/docs/internal_logging.md
new file mode 100644
index 0000000..6d95360
--- /dev/null
+++ b/docs/internal_logging.md
@@ -0,0 +1,61 @@
+---
+title: "How to use logging"
+---
+
+The logging in Flink is implemented using the slf4j logging interface. As underlying logging framework, logback is used.
+
+## Configuring logback
+
+For users and developers alike it is important to control the logging framework. 
+The configuration of the logging framework is exclusively done by configuration files.
+The configuration file either has to be specified by setting the environment property `-Dlogback.configurationFile=<file>` or by putting `logback.xml` in the classpath.
+The `conf` directory contains a `logback.xml` file which can be modified and is used if Flink is started outside of an IDE and with the provided starting scripts.
+The provided `logback.xml` has the following form:
+
+``` xml
+<configuration>
+    <appender name="file" class="ch.qos.logback.core.FileAppender">
+        <file>${log.file}</file>
+        <append>false</append>
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="INFO">
+        <appender-ref ref="file"/>
+    </root>
+</configuration>
+```
+
+In order to control the logging level of `org.apache.flink.runtime.jobgraph.JobGraph`, for example, one would have to add the following line to the configuration file.
+``` xml
+<logger name="org.apache.flink.runtime.jobgraph.JobGraph" level="DEBUG"/>
+```
+
+For further information on configuring logback see [LOGback's manual](http://logback.qos.ch/manual/configuration.html).
+
+## Best practices for developers
+
+The loggers using slf4j are created by calling
+``` java
+import org.slf4j.LoggerFactory
+import org.slf4j.Logger
+
+Logger LOG = LoggerFactory.getLogger(Foobar.class)
+```
+
+In order to benefit most from slf4j, it is recommended to use its placeholder mechanism.
+Using placeholders allows to avoid unnecessary string constructions in case that the logging level is set so high that the message would not be logged.
+The syntax of placeholders is the following:
+``` java
+LOG.info("This message contains {} placeholders. {}", 2, "Yippie");
+```
+
+Placeholders can also be used in conjunction with exceptions which shall be logged.
+
+``` java
+catch(Exception exception){
+	LOG.error("An {} occurred.", "error", exception);
+}
+```
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/docs/internal_overview.md
----------------------------------------------------------------------
diff --git a/docs/internal_overview.md b/docs/internal_overview.md
index f3090d0..4c71c6e 100644
--- a/docs/internal_overview.md
+++ b/docs/internal_overview.md
@@ -39,3 +39,4 @@ or pull request that updates these documents as well.*
 - [RPC and JobManager Communication](rpc_transfer.html)
 -->
 
+- [How-to: Using logging in Flink](internal_logging.html)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
index 1031d81..a85eb22 100644
--- a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
+++ b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
@@ -27,8 +27,8 @@ import org.apache.avro.file.SeekableInput;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.api.avro.FSDataInputStreamWrapper;
 import org.apache.flink.api.common.io.FileInputFormat;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
@@ -43,7 +43,7 @@ public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultType
 	
 	private static final long serialVersionUID = 1L;
 
-	private static final Log LOG = LogFactory.getLog(AvroInputFormat.class);
+	private static final Logger LOG = LoggerFactory.getLogger(AvroInputFormat.class);
 	
 	
 	private final Class<E> avroValueType;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroInputFormat.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroInputFormat.java
index ab96895..2f31837 100644
--- a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroInputFormat.java
+++ b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroInputFormat.java
@@ -27,8 +27,8 @@ import org.apache.avro.file.SeekableInput;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.api.avro.AvroBaseValue;
 import org.apache.flink.api.avro.FSDataInputStreamWrapper;
 import org.apache.flink.api.java.record.io.FileInputFormat;
@@ -42,7 +42,7 @@ public class AvroInputFormat<E> extends FileInputFormat {
 	
 	private static final long serialVersionUID = 1L;
 
-	private static final Log LOG = LogFactory.getLog(AvroInputFormat.class);
+	private static final Logger LOG = LoggerFactory.getLogger(AvroInputFormat.class);
 	
 	
 	private final Class<? extends AvroBaseValue<E>> avroWrapperTypeClass;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormat.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormat.java
index 1464ca9..be13114 100644
--- a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormat.java
+++ b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormat.java
@@ -32,8 +32,8 @@ import org.apache.avro.file.SeekableInput;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumReader;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.api.avro.FSDataInputStreamWrapper;
 import org.apache.flink.api.java.record.io.FileInputFormat;
 import org.apache.flink.core.fs.FileInputSplit;
@@ -64,7 +64,7 @@ import org.apache.flink.types.Value;
 public class AvroRecordInputFormat extends FileInputFormat {
 	private static final long serialVersionUID = 1L;
 
-	private static final Log LOG = LogFactory.getLog(AvroRecordInputFormat.class);
+	private static final Logger LOG = LoggerFactory.getLogger(AvroRecordInputFormat.class);
 
 	private FileReader<GenericRecord> dataFileReader;
 	private GenericRecord reuseAvroRecord = null;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
index 74a1e15..2e4b141 100644
--- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
+++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
@@ -25,7 +25,6 @@ import org.apache.flink.client.minicluster.NepheleMiniCluster;
 import org.apache.flink.client.program.Client;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.LogUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -36,10 +35,6 @@ public class AvroExternalJarProgramITCase {
 	
 	private static final String TEST_DATA_FILE = "/testdata.avro";
 
-	static {
-		LogUtils.initializeDefaultTestConsoleLogger();
-	}
-	
 	@Test
 	public void testExternalProgram() {
 		

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
index 76b23ef..05ad284 100644
--- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
+++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
@@ -31,8 +31,6 @@ import java.util.Random;
 
 import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.flink.api.avro.DataInputDecoder;
-import org.apache.flink.api.avro.DataOutputEncoder;
 import org.apache.flink.api.java.record.io.avro.generated.Colors;
 import org.apache.flink.api.java.record.io.avro.generated.User;
 import org.apache.flink.util.StringUtils;
@@ -40,12 +38,10 @@ import org.junit.Test;
 
 import static org.junit.Assert.*;
 
-
 /**
  * Tests the {@link DataOutputEncoder} and {@link DataInputDecoder} classes for Avro serialization.
  */
 public class EncoderDecoderTest {
-	
 	@Test
 	public void testComplexStringsDirecty() {
 		try {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-avro/src/test/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormatTest.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormatTest.java
index 64fae94..e0610f0 100644
--- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormatTest.java
+++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormatTest.java
@@ -29,7 +29,6 @@ import org.junit.Assert;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.io.DatumWriter;
 import org.apache.avro.specific.SpecificDatumWriter;
-import org.apache.flink.api.java.record.io.avro.AvroRecordInputFormat;
 import org.apache.flink.api.java.record.io.avro.AvroRecordInputFormat.BooleanListValue;
 import org.apache.flink.api.java.record.io.avro.AvroRecordInputFormat.LongMapValue;
 import org.apache.flink.api.java.record.io.avro.AvroRecordInputFormat.StringListValue;
@@ -68,8 +67,7 @@ public class AvroRecordInputFormatTest {
 	final static long TEST_MAP_VALUE1 = 8546456L;
 	final static CharSequence TEST_MAP_KEY2 = "KEY 2";
 	final static long TEST_MAP_VALUE2 = 17554L;
-	
-	
+
 	@Before
 	public void createFiles() throws IOException {
 		testFile = File.createTempFile("AvroInputFormatTest", null);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-avro/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/resources/logback-test.xml b/flink-addons/flink-avro/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..8b3bb27
--- /dev/null
+++ b/flink-addons/flink-avro/src/test/resources/logback-test.xml
@@ -0,0 +1,29 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java
index 030d7f2..c224d64 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java
@@ -24,8 +24,8 @@ import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.util.ArrayList;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
@@ -51,7 +51,7 @@ public class HadoopInputFormat<K extends Writable, V extends Writable> implement
 	
 	private static final long serialVersionUID = 1L;
 	
-	private static final Log LOG = LogFactory.getLog(HadoopInputFormat.class);
+	private static final Logger LOG = LoggerFactory.getLogger(HadoopInputFormat.class);
 	
 	private org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat;
 	private Class<K> keyClass;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java
index cf12cae..de443c1 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java
@@ -25,8 +25,8 @@ import java.io.ObjectOutputStream;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
@@ -54,7 +54,7 @@ public class HadoopInputFormat<K extends Writable, V extends Writable> implement
 	
 	private static final long serialVersionUID = 1L;
 	
-	private static final Log LOG = LogFactory.getLog(HadoopInputFormat.class);
+	private static final Logger LOG = LoggerFactory.getLogger(HadoopInputFormat.class);
 	
 	private org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat;
 	private Class<K> keyClass;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-hadoop-compatibility/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/test/resources/logback-test.xml b/flink-addons/flink-hadoop-compatibility/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..8b3bb27
--- /dev/null
+++ b/flink-addons/flink-hadoop-compatibility/src/test/resources/logback-test.xml
@@ -0,0 +1,29 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
index 9ff5af7..fdf3c6c 100644
--- a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
+++ b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
@@ -23,8 +23,8 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.addons.hbase.common.HBaseKey;
 import org.apache.flink.addons.hbase.common.HBaseResult;
 import org.apache.flink.addons.hbase.common.HBaseUtil;
@@ -51,7 +51,7 @@ public class TableInputFormat implements InputFormat<Record, TableInputSplit> {
 
 	private static final long serialVersionUID = 1L;
 
-	private static final Log LOG = LogFactory.getLog(TableInputFormat.class);
+	private static final Logger LOG = LoggerFactory.getLogger(TableInputFormat.class);
 
 	/** A handle on an HBase table */
 	private HTable table;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java b/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
index ac8bc07..b17f658 100644
--- a/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
+++ b/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
@@ -26,8 +26,8 @@ import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Statement;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.api.java.tuple.Tuple;
@@ -48,7 +48,7 @@ public class JDBCInputFormat<OUT extends Tuple> implements InputFormat<OUT, Inpu
 	private static final long serialVersionUID = 1L;
 
 	@SuppressWarnings("unused")
-	private static final Log LOG = LogFactory.getLog(JDBCInputFormat.class);
+	private static final Logger LOG = LoggerFactory.getLogger(JDBCInputFormat.class);
 
 	private String username;
 	private String password;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java b/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
index 3a75480..9e08206 100644
--- a/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
+++ b/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
@@ -24,8 +24,8 @@ import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.configuration.Configuration;
@@ -42,7 +42,7 @@ public class JDBCOutputFormat<OUT extends Tuple> implements OutputFormat<OUT> {
 	private static final long serialVersionUID = 1L;
 
 	@SuppressWarnings("unused")
-	private static final Log LOG = LogFactory.getLog(JDBCOutputFormat.class);
+	private static final Logger LOG = LoggerFactory.getLogger(JDBCOutputFormat.class);
 
 	private String username;
 	private String password;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormat.java b/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormat.java
index f2930f2..9a4d927 100644
--- a/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormat.java
+++ b/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormat.java
@@ -25,8 +25,8 @@ import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Statement;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.io.NonParallelInput;
 import org.apache.flink.api.java.record.io.GenericInputFormat;
 import org.apache.flink.configuration.Configuration;
@@ -56,7 +56,7 @@ public class JDBCInputFormat extends GenericInputFormat implements NonParallelIn
 	private static final long serialVersionUID = 1L;
 	
 	@SuppressWarnings("unused")
-	private static final Log LOG = LogFactory.getLog(JDBCInputFormat.class);
+	private static final Logger LOG = LoggerFactory.getLogger(JDBCInputFormat.class);
 	
 
 	public final String DRIVER_KEY = "driver";

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java b/flink-addons/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
index 41e9c5a..84895bf 100644
--- a/flink-addons/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
+++ b/flink-addons/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
@@ -26,7 +26,6 @@ import java.sql.Statement;
 
 import org.junit.Assert;
 
-import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.junit.After;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java b/flink-addons/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
index a101a74..904c6d8 100644
--- a/flink-addons/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
+++ b/flink-addons/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
@@ -26,8 +26,6 @@ import java.sql.Statement;
 
 import org.junit.Assert;
 
-import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
-import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.junit.After;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormatTest.java b/flink-addons/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormatTest.java
index 994eaf5..fd0764e 100644
--- a/flink-addons/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormatTest.java
+++ b/flink-addons/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormatTest.java
@@ -26,8 +26,6 @@ import java.sql.Statement;
 
 import org.junit.Assert;
 
-import org.apache.flink.api.java.record.io.jdbc.JDBCInputFormat;
-import org.apache.flink.api.java.record.io.jdbc.JDBCOutputFormat;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.types.DoubleValue;
 import org.apache.flink.types.FloatValue;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-jdbc/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-jdbc/src/test/resources/logback-test.xml b/flink-addons/flink-jdbc/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..8b3bb27
--- /dev/null
+++ b/flink-addons/flink-jdbc/src/test/resources/logback-test.xml
@@ -0,0 +1,29 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-spargel/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/test/resources/logback-test.xml b/flink-addons/flink-spargel/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..8b3bb27
--- /dev/null
+++ b/flink-addons/flink-spargel/src/test/resources/logback-test.xml
@@ -0,0 +1,29 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml b/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
index 2c61fc9..3974720 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
@@ -52,6 +52,16 @@ under the License.
 			<groupId>org.apache.kafka</groupId>
 			<artifactId>kafka_2.10</artifactId>
 			<version>0.8.0</version>
+			<exclusions>
+				<exclusion>
+					<groupId>com.sun.jmx</groupId>
+					<artifactId>jmxri</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jdmk</groupId>
+					<artifactId>jmxtools</artifactId>
+				</exclusion>
+			</exclusions>
 		</dependency>
 
 		<dependency>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
index 5282704..c4618a7 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
@@ -17,8 +17,8 @@
 
 package org.apache.flink.streaming.connectors.flume;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
 import org.apache.flume.Event;
@@ -31,7 +31,7 @@ import org.apache.flume.event.EventBuilder;
 public abstract class FlumeSink<IN> implements SinkFunction<IN> {
 	private static final long serialVersionUID = 1L;
 
-	private static final Log LOG = LogFactory.getLog(FlumeSink.class);
+	private static final Logger LOG = LoggerFactory.getLogger(FlumeSink.class);
 
 	private transient FlinkRpcClientFacade client;
 	boolean initDone = false;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
index a809c7a..ae04298 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
@@ -19,8 +19,8 @@ package org.apache.flink.streaming.connectors.rabbitmq;
 
 import java.io.IOException;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
 
 import com.rabbitmq.client.Channel;
@@ -30,7 +30,7 @@ import com.rabbitmq.client.ConnectionFactory;
 public abstract class RMQSink<IN> implements SinkFunction<IN> {
 	private static final long serialVersionUID = 1L;
 
-	private static final Log LOG = LogFactory.getLog(RMQSink.class);
+	private static final Logger LOG = LoggerFactory.getLogger(RMQSink.class);
 
 	private boolean sendAndClose = false;
 	private boolean closeWithoutSend = false;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
index 30c9097..1fcd57b 100755
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
@@ -1,4 +1,4 @@
-/**
+/**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -12,15 +12,15 @@
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
- * limitations under the License.
+ * limitations under the License.
  */
 
 package org.apache.flink.streaming.connectors.rabbitmq;
 
 import java.io.IOException;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import net.spy.memcached.compat.log.Logger;
+import net.spy.memcached.compat.log.LoggerFactory;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.function.source.RichSourceFunction;
 import org.apache.flink.util.Collector;
@@ -33,7 +33,7 @@ import com.rabbitmq.client.QueueingConsumer;
 public abstract class RMQSource<OUT> extends RichSourceFunction<OUT> {
 	private static final long serialVersionUID = 1L;
 
-	private static final Log LOG = LogFactory.getLog(RMQSource.class);
+	private static final Logger LOG = LoggerFactory.getLogger(RMQSource.class);
 
 	private final String QUEUE_NAME;
 	private final String HOST_NAME;
@@ -86,9 +86,7 @@ public abstract class RMQSource<OUT> extends RichSourceFunction<OUT> {
 			try {
 				delivery = consumer.nextDelivery();
 			} catch (Exception e) {
-				if (LOG.isErrorEnabled()) {
-					LOG.error("Cannot receive RMQ message " + QUEUE_NAME + " at " + HOST_NAME);
-				}
+				LOG.error("Cannot receive RMQ message " + QUEUE_NAME + " at " + HOST_NAME);
 			}
 
 			outTuple = deserialize(delivery.getBody());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
index 8a1e29e..4aa7a43 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
@@ -25,10 +25,10 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.function.source.RichSourceFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.util.Collector;
 
@@ -46,7 +46,7 @@ import com.twitter.hbc.httpclient.auth.OAuth1;
  */
 public class TwitterSource extends RichSourceFunction<String> {
 
-	private static final Log LOG = LogFactory.getLog(TwitterSource.class);
+	private static final Logger LOG = LoggerFactory.getLogger(TwitterSource.class);
 
 	private static final long serialVersionUID = 1L;
 	private String authPath;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-streaming/flink-streaming-connectors/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/test/resources/logback-test.xml b/flink-addons/flink-streaming/flink-streaming-connectors/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..45b3b92
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/test/resources/logback-test.xml
@@ -0,0 +1,30 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+    <logger name="org.apache.flink.streaming" level="WARN"/>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
index 77c860b..aaa7161 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
@@ -22,8 +22,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.runtime.io.network.channels.ChannelType;
 import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
@@ -54,7 +54,7 @@ import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
  */
 public class JobGraphBuilder {
 
-	private static final Log LOG = LogFactory.getLog(JobGraphBuilder.class);
+	private static final Logger LOG = LoggerFactory.getLogger(JobGraphBuilder.class);
 	private final JobGraph jobGraph;
 
 	// Graph attributes

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
index 162c5df..54b1a98 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
@@ -1,4 +1,4 @@
-/**
+/**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -12,21 +12,21 @@
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
- * limitations under the License.
+ * limitations under the License.
  */
 
 package org.apache.flink.streaming.api.collector;
 
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.runtime.io.network.api.RecordWriter;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.util.StringUtils;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.flink.runtime.io.network.api.RecordWriter;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A StreamCollector that uses user defined output names and a user defined
@@ -38,7 +38,7 @@ import org.apache.flink.util.StringUtils;
 public class DirectedStreamCollector<OUT> extends StreamCollector<OUT> {
 
 	OutputSelector<OUT> outputSelector;
-	private static final Log LOG = LogFactory.getLog(DirectedStreamCollector.class);
+	private static final Logger LOG = LoggerFactory.getLogger(DirectedStreamCollector.class);
 	private Set<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> emitted;
 
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
index 65098d4..c6ba1ef 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
@@ -1,4 +1,4 @@
-/**
+/**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -12,7 +12,7 @@
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
- * limitations under the License.
+ * limitations under the License.
  */
 
 package org.apache.flink.streaming.api.collector;
@@ -22,13 +22,13 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.flink.runtime.io.network.api.RecordWriter;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Collector for tuples in Apache Flink stream processing. The collected values
@@ -40,7 +40,7 @@ import org.apache.flink.util.StringUtils;
  */
 public class StreamCollector<OUT> implements Collector<OUT> {
 
-	private static final Log LOG = LogFactory.getLog(StreamCollector.class);
+	private static final Logger LOG = LoggerFactory.getLogger(StreamCollector.class);
 
 	protected StreamRecord<OUT> streamRecord;
 	protected int channelID;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index 9169d65..b2fcf89 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -21,8 +21,8 @@ import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.flink.client.program.Client;
 import org.apache.flink.client.program.JobWithJars;
@@ -32,7 +32,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 
 public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
-	private static final Log LOG = LogFactory.getLog(RemoteStreamEnvironment.class);
+	private static final Logger LOG = LoggerFactory.getLogger(RemoteStreamEnvironment.class);
 
 	private String host;
 	private int port;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamOperatorInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamOperatorInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamOperatorInvokable.java
index d92d1f0..799f647 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamOperatorInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamOperatorInvokable.java
@@ -17,14 +17,14 @@
 
 package org.apache.flink.streaming.api.invokable;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 import org.apache.flink.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * The StreamOperatorInvokable represents the base class for all operators in
@@ -42,7 +42,7 @@ public abstract class StreamOperatorInvokable<IN, OUT> extends StreamInvokable<O
 	}
 
 	private static final long serialVersionUID = 1L;
-	private static final Log LOG = LogFactory.getLog(StreamInvokable.class);
+	private static final Logger LOG = LoggerFactory.getLogger(StreamOperatorInvokable.class);
 
 	protected MutableObjectIterator<StreamRecord<IN>> recordIterator;
 	protected StreamRecordSerializer<IN> serializer;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
index 7b4f3ff..8a1a637 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
@@ -20,8 +20,8 @@ package org.apache.flink.streaming.api.streamcomponent;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.io.BlockingQueueBroker;
@@ -30,7 +30,7 @@ import org.apache.flink.util.StringUtils;
 public class StreamIterationSink<IN extends Tuple> extends
 		AbstractStreamComponent {
 
-	private static final Log LOG = LogFactory.getLog(StreamIterationSink.class);
+	private static final Logger LOG = LoggerFactory.getLogger(StreamIterationSink.class);
 
 	private InputHandler<IN> inputHandler;
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
index 7de80b2..ab02d84 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
@@ -21,8 +21,8 @@ import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.runtime.io.network.api.RecordWriter;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
@@ -31,7 +31,7 @@ import org.apache.flink.streaming.io.BlockingQueueBroker;
 
 public class StreamIterationSource<OUT extends Tuple> extends AbstractStreamComponent {
 
-	private static final Log LOG = LogFactory.getLog(StreamIterationSource.class);
+	private static final Logger LOG = LoggerFactory.getLogger(StreamIterationSource.class);
 
 	private OutputHandler<OUT> outputHandler;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
index 145b709..0797cc1 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
@@ -17,13 +17,13 @@
 
 package org.apache.flink.streaming.api.streamcomponent;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class StreamSink<IN> extends AbstractStreamComponent {
 
-	private static final Log LOG = LogFactory.getLog(StreamSink.class);
+	private static final Logger LOG = LoggerFactory.getLogger(StreamSink.class);
 
 	private InputHandler<IN> inputHandler;
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
index 50a78b3..0853ec7 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
@@ -19,17 +19,17 @@ package org.apache.flink.streaming.util;
 
 import java.net.InetSocketAddress;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.flink.client.minicluster.NepheleMiniCluster;
 import org.apache.flink.client.program.Client;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ClusterUtil {
 
-	private static final Log LOG = LogFactory.getLog(ClusterUtil.class);
+	private static final Logger LOG = LoggerFactory.getLogger(ClusterUtil.class);
 	public static final String CANNOT_EXECUTE_EMPTY_JOB = "Cannot execute empty job";
 
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/LogUtils.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/LogUtils.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/LogUtils.java
deleted file mode 100644
index 82a6119..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/LogUtils.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import org.apache.log4j.ConsoleAppender;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.apache.log4j.PatternLayout;
-
-public class LogUtils {
-
-	public static void initializeDefaultConsoleLogger() {
-		initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
-	}
-
-	public static void initializeDefaultConsoleLogger(Level logLevel, Level rootLevel) {
-		Logger logger = Logger.getLogger("org.apache.flink.streaming");
-		logger.removeAllAppenders();
-		logger.setAdditivity(false);
-		PatternLayout layout = new PatternLayout();
-		// layout.setConversionPattern("%highlight{%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n}");
-		// TODO Add highlight
-		layout.setConversionPattern("%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n");
-		ConsoleAppender appender = new ConsoleAppender(layout, "System.err");
-		logger.addAppender(appender);
-		logger.setLevel(logLevel);
-
-		Logger root = Logger.getRootLogger();
-		root.removeAllAppenders();
-		root.addAppender(appender);
-		root.setLevel(rootLevel);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/TestDataUtil.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/TestDataUtil.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/TestDataUtil.java
index 9a19256..53eafaa 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/TestDataUtil.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/TestDataUtil.java
@@ -30,14 +30,14 @@ import java.net.URL;
 
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.io.FileUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class TestDataUtil {
 
 	// TODO: Exception handling
 	// TODO: check checksum after download
-	private static final Log LOG = LogFactory.getLog(TestDataUtil.class);
+	private static final Logger LOG = LoggerFactory.getLogger(TestDataUtil.class);
 	public static final String testDataDir = "src/test/resources/testdata/";
 	public static final String testRepoUrl = "http://info.ilab.sztaki.hu/~mbalassi/flink-streaming/testdata/";
 	public static final String testChekSumDir = "src/test/resources/testdata_checksum/";

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
index da9de05..83f7e8e 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
 import org.apache.flink.util.Collector;
-import org.apache.flink.util.LogUtils;
 import org.junit.Test;
 
 public class IterateTest {
@@ -74,8 +73,6 @@ public class IterateTest {
 
 	@Test
 	public void test() throws Exception {
-		LogUtils.initializeDefaultTestConsoleLogger();
-
 		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
 
 		env.setBufferTimeout(10);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
index 43c36a0..8baa99f 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
@@ -21,7 +21,6 @@ import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.util.LogUtils;
 import org.junit.Test;
 
 public class PrintTest{
@@ -48,8 +47,6 @@ public class PrintTest{
 	
 	@Test
 	public void test() throws Exception {
-		LogUtils.initializeDefaultTestConsoleLogger();
-
 		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
 		env.generateSequence(1, 10).map(new IdentityMap()).filter(new FilterAll()).print();
 		env.executeTest(MEMORYSIZE);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java
index 417ed85..7e87537 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java
@@ -32,7 +32,6 @@ import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.util.Collector;
-import org.apache.flink.util.LogUtils;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -133,9 +132,6 @@ public class WriteAsCsvTest {
 	
 	@Test
 	public void test() throws Exception {
-
-		LogUtils.initializeDefaultTestConsoleLogger();
-		
 		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
 
 		@SuppressWarnings("unused")

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
index eea97ec..df970ae 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
@@ -31,7 +31,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.util.LogUtils;
 import org.apache.flink.util.Collector;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -133,9 +132,6 @@ public class WriteAsTextTest {
 
 	@Test
 	public void test() throws Exception {
-		
-		LogUtils.initializeDefaultTestConsoleLogger();
-		
 		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
 		
 		@SuppressWarnings("unused")

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
index 1580235..1608b7b 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
@@ -29,8 +29,6 @@ import org.apache.flink.streaming.api.datastream.SplitDataStream;
 import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
-import org.apache.flink.streaming.util.LogUtils;
-import org.apache.log4j.Level;
 import org.junit.Test;
 
 public class DirectedOutputTest {
@@ -90,7 +88,6 @@ public class DirectedOutputTest {
 	@SuppressWarnings("unused")
 	@Test
 	public void directOutputTest() throws Exception {
-		LogUtils.initializeDefaultConsoleLogger(Level.OFF, Level.OFF);
 
 		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
 		SplitDataStream<Long> s = env.generateSequence(1, 6).split(new MySelector(),

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
index b31b39c..e7f12bd 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
@@ -29,10 +29,8 @@ import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.co.CoFlatMapFunction;
 import org.apache.flink.streaming.api.invokable.operator.co.CoFlatMapInvokable;
-import org.apache.flink.streaming.util.LogUtils;
 import org.apache.flink.streaming.util.MockCoInvokable;
 import org.apache.flink.util.Collector;
-import org.apache.log4j.Level;
 import org.junit.Test;
 
 public class CoFlatMapTest implements Serializable {
@@ -70,8 +68,6 @@ public class CoFlatMapTest implements Serializable {
 	@SuppressWarnings("unchecked")
 	@Test
 	public void multipleInputTest() {
-		LogUtils.initializeDefaultConsoleLogger(Level.OFF, Level.OFF);
-
 		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
 
 		DataStream<Integer> ds1 = env.fromElements(1, 3, 5);