You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2014/09/05 11:57:58 UTC
[5/5] git commit: [FLINK-1086] Replace JCL with SLF4J and Log4j with
LOGBack
[FLINK-1086] Replace JCL with SLF4J and Log4j with LOGBack
- Excluded Kafka's transitive dependencies: jmxtools and jmxri
- Corrected encoder pattern in logback.xml
- Removed explicit logging access. Loggers are now configured by
configuration files. Fixed Yarn issue with multiple logging
bindings.
This closes #111.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/08188508
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/08188508
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/08188508
Branch: refs/heads/master
Commit: 08188508d528c1072a746aacbf2a5c712d4f8467
Parents: 3aa5511
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Aug 28 12:36:53 2014 +0200
Committer: uce <u....@fu-berlin.de>
Committed: Fri Sep 5 11:57:02 2014 +0200
----------------------------------------------------------------------
DEPENDENCIES | 18 ++++--
docs/internal_logging.md | 61 ++++++++++++++++++++
docs/internal_overview.md | 1 +
.../flink/api/java/io/AvroInputFormat.java | 6 +-
.../java/record/io/avro/AvroInputFormat.java | 6 +-
.../record/io/avro/AvroRecordInputFormat.java | 6 +-
.../api/avro/AvroExternalJarProgramITCase.java | 5 --
.../flink/api/avro/EncoderDecoderTest.java | 4 --
.../io/avro/AvroRecordInputFormatTest.java | 4 +-
.../src/test/resources/logback-test.xml | 29 ++++++++++
.../mapred/HadoopInputFormat.java | 6 +-
.../mapreduce/HadoopInputFormat.java | 6 +-
.../src/test/resources/logback-test.xml | 29 ++++++++++
.../flink/addons/hbase/TableInputFormat.java | 6 +-
.../flink/api/java/io/jdbc/JDBCInputFormat.java | 6 +-
.../api/java/io/jdbc/JDBCOutputFormat.java | 6 +-
.../java/record/io/jdbc/JDBCInputFormat.java | 6 +-
.../api/java/io/jdbc/JDBCInputFormatTest.java | 1 -
.../api/java/io/jdbc/JDBCOutputFormatTest.java | 2 -
.../record/io/jdbc/JDBCOutputFormatTest.java | 2 -
.../src/test/resources/logback-test.xml | 29 ++++++++++
.../src/test/resources/logback-test.xml | 29 ++++++++++
.../flink-streaming-connectors/pom.xml | 10 ++++
.../streaming/connectors/flume/FlumeSink.java | 6 +-
.../streaming/connectors/rabbitmq/RMQSink.java | 6 +-
.../connectors/rabbitmq/RMQSource.java | 14 ++---
.../connectors/twitter/TwitterSource.java | 6 +-
.../src/test/resources/logback-test.xml | 30 ++++++++++
.../flink/streaming/api/JobGraphBuilder.java | 6 +-
.../api/collector/DirectedStreamCollector.java | 26 ++++-----
.../api/collector/StreamCollector.java | 10 ++--
.../environment/RemoteStreamEnvironment.java | 6 +-
.../api/invokable/StreamOperatorInvokable.java | 6 +-
.../streamcomponent/StreamIterationSink.java | 6 +-
.../streamcomponent/StreamIterationSource.java | 6 +-
.../api/streamcomponent/StreamSink.java | 6 +-
.../flink/streaming/util/ClusterUtil.java | 6 +-
.../apache/flink/streaming/util/LogUtils.java | 48 ---------------
.../flink/streaming/util/TestDataUtil.java | 6 +-
.../apache/flink/streaming/api/IterateTest.java | 3 -
.../apache/flink/streaming/api/PrintTest.java | 3 -
.../flink/streaming/api/WriteAsCsvTest.java | 4 --
.../flink/streaming/api/WriteAsTextTest.java | 4 --
.../api/collector/DirectedOutputTest.java | 3 -
.../api/invokable/operator/CoFlatMapTest.java | 4 --
.../streamcomponent/StreamComponentTest.java | 11 ----
.../src/test/resources/logback-test.xml | 30 ++++++++++
.../main/java/org/apache/flink/yarn/Client.java | 59 +++++++------------
.../apache/flink/yarn/ClientMasterControl.java | 6 +-
.../main/java/org/apache/flink/yarn/Utils.java | 12 ++--
.../flink/yarn/appMaster/ApplicationMaster.java | 23 ++++----
.../yarn/appMaster/YarnTaskManagerRunner.java | 8 +--
.../org/apache/flink/client/LocalExecutor.java | 15 +----
.../org/apache/flink/client/WebFrontend.java | 6 +-
.../client/minicluster/NepheleMiniCluster.java | 6 +-
.../org/apache/flink/client/program/Client.java | 6 +-
.../flink/client/web/JobSubmissionServlet.java | 8 +--
.../flink/client/web/PactJobJSONServlet.java | 6 +-
.../flink/client/web/WebInterfaceServer.java | 6 +-
.../org/apache/flink/compiler/PactCompiler.java | 6 +-
.../plantranslate/NepheleJobGraphGenerator.java | 2 +-
.../apache/flink/compiler/CompilerTestBase.java | 7 ---
.../src/test/resources/logback-test.xml | 29 ++++++++++
flink-core/pom.xml | 2 +-
.../flink/api/common/io/BinaryInputFormat.java | 6 +-
.../api/common/io/DelimitedInputFormat.java | 6 +-
.../flink/api/common/io/FileInputFormat.java | 6 +-
.../flink/api/common/io/FileOutputFormat.java | 6 +-
.../operators/base/BulkIterationBase.java | 6 +-
.../configuration/GlobalConfiguration.java | 10 +---
.../flink/core/fs/local/LocalFileSystem.java | 8 +--
.../java/org/apache/flink/util/LogUtils.java | 45 ---------------
.../api/common/io/BinaryInputFormatTest.java | 10 ----
.../io/DelimitedInputFormatSamplingTest.java | 5 --
.../api/common/io/DelimitedInputFormatTest.java | 11 +---
.../api/common/io/FileInputFormatTest.java | 8 ---
.../api/common/io/FileOutputFormatTest.java | 8 ---
.../common/io/GenericCsvInputFormatTest.java | 10 ----
.../api/common/io/SequentialFormatTest.java | 14 -----
.../typeutils/SerializerTestInstance.java | 3 +-
.../configuration/GlobalConfigurationTest.java | 10 ----
flink-core/src/test/resources/logback-test.xml | 29 ++++++++++
flink-dist/src/main/assemblies/yarn.xml | 2 +-
flink-dist/src/main/flink-bin/LICENSE | 14 ++++-
flink-dist/src/main/flink-bin/NOTICE | 15 +++--
flink-dist/src/main/flink-bin/bin/flink | 2 +-
flink-dist/src/main/flink-bin/bin/jobmanager.sh | 2 +-
.../src/main/flink-bin/bin/start-local.bat | 24 ++++----
.../src/main/flink-bin/bin/taskmanager.sh | 2 +-
flink-dist/src/main/flink-bin/bin/webclient.sh | 2 +-
.../src/main/flink-bin/conf/log4j.properties | 27 ---------
.../main/flink-bin/conf/log4jconsole.properties | 25 --------
flink-dist/src/main/flink-bin/conf/logback.xml | 31 ++++++++++
.../src/main/flink-bin/conf/logbackConsole.xml | 30 ++++++++++
.../src/main/flink-bin/yarn-bin/yarn-session.sh | 2 +-
.../apache/flink/api/java/LocalEnvironment.java | 40 -------------
.../flink/api/java/io/CsvOutputFormat.java | 6 +-
.../api/java/record/io/CsvOutputFormat.java | 6 +-
.../api/java/record/io/TextInputFormat.java | 6 +-
.../flink/api/java/io/CsvInputFormatTest.java | 11 +---
.../flink/api/java/io/TextInputFormatTest.java | 9 ---
.../api/java/record/io/CsvInputFormatTest.java | 10 ----
.../record/io/FixedLenghtInputFormatTest.java | 11 +---
.../api/java/record/io/TextInputFormatTest.java | 10 ----
.../apache/flink/runtime/client/JobClient.java | 6 +-
.../execution/ExecutionStateTransition.java | 6 +-
.../runtime/execution/RuntimeEnvironment.java | 8 +--
.../runtime/executiongraph/ExecutionGraph.java | 6 +-
.../executiongraph/ExecutionGraphIterator.java | 6 +-
.../executiongraph/ExecutionSignature.java | 6 +-
.../runtime/executiongraph/ExecutionVertex.java | 6 +-
.../runtime/fs/hdfs/DistributedFileSystem.java | 6 +-
.../flink/runtime/fs/maprfs/MapRFileSystem.java | 8 +--
.../flink/runtime/fs/s3/S3FileSystem.java | 6 +-
.../instance/DefaultInstanceManager.java | 6 +-
.../instance/HardwareDescriptionFactory.java | 8 +--
.../runtime/io/disk/iomanager/IOManager.java | 8 +--
.../runtime/io/network/ChannelManager.java | 6 +-
.../bufferprovider/GlobalBufferPool.java | 6 +-
.../io/network/channels/InputChannel.java | 6 +-
.../io/network/channels/OutputChannel.java | 6 +-
.../runtime/io/network/gates/InputGate.java | 6 +-
.../network/netty/InboundEnvelopeDecoder.java | 6 +-
.../network/netty/NettyConnectionManager.java | 6 +-
.../network/netty/OutboundConnectionQueue.java | 6 +-
.../org/apache/flink/runtime/ipc/Client.java | 12 ++--
.../java/org/apache/flink/runtime/ipc/RPC.java | 6 +-
.../org/apache/flink/runtime/ipc/Server.java | 8 +--
.../WorksetEmptyConvergenceCriterion.java | 6 +-
.../task/AbstractIterativePactTask.java | 6 +-
.../iterative/task/IterationHeadPactTask.java | 6 +-
.../task/IterationIntermediatePactTask.java | 6 +-
.../task/IterationSynchronizationSinkTask.java | 6 +-
.../iterative/task/IterationTailPactTask.java | 6 +-
.../iterative/task/SyncEventHandler.java | 2 +-
.../flink/runtime/jobmanager/JobManager.java | 30 +++-------
.../runtime/jobmanager/JobManagerUtils.java | 6 +-
.../jobmanager/scheduler/DefaultScheduler.java | 10 ++--
.../DefaultInputSplitAssigner.java | 6 +-
.../splitassigner/InputSplitManager.java | 6 +-
.../splitassigner/InputSplitTracker.java | 6 +-
.../LocatableInputSplitAssigner.java | 6 +-
.../splitassigner/LocatableInputSplitList.java | 6 +-
.../file/FileInputSplitAssigner.java | 6 +-
.../splitassigner/file/FileInputSplitList.java | 6 +-
.../jobmanager/web/JobmanagerInfoServlet.java | 6 +-
.../jobmanager/web/LogfileInfoServlet.java | 17 +++---
.../runtime/jobmanager/web/MenuServlet.java | 8 +--
.../jobmanager/web/SetupInfoServlet.java | 6 +-
.../runtime/jobmanager/web/WebInfoServer.java | 6 +-
.../ManagementGraphIterator.java | 6 +-
.../memorymanager/DefaultMemoryManager.java | 8 +--
.../org/apache/flink/runtime/net/NetUtils.java | 8 +--
.../flink/runtime/net/SocketIOWithTimeout.java | 6 +-
.../runtime/operators/AllGroupReduceDriver.java | 8 +--
.../runtime/operators/AllReduceDriver.java | 8 +--
.../flink/runtime/operators/CoGroupDriver.java | 6 +-
.../flink/runtime/operators/CrossDriver.java | 6 +-
.../flink/runtime/operators/DataSinkTask.java | 6 +-
.../flink/runtime/operators/DataSourceTask.java | 6 +-
.../operators/GroupReduceCombineDriver.java | 6 +-
.../runtime/operators/GroupReduceDriver.java | 8 +--
.../flink/runtime/operators/MatchDriver.java | 6 +-
.../runtime/operators/ReduceCombineDriver.java | 6 +-
.../flink/runtime/operators/ReduceDriver.java | 6 +-
.../runtime/operators/RegularPactTask.java | 6 +-
.../operators/hash/CompactingHashTable.java | 6 +-
.../operators/hash/MutableHashTable.java | 6 +-
.../AbstractBlockResettableIterator.java | 6 +-
.../resettable/BlockResettableIterator.java | 6 +-
.../BlockResettableMutableObjectIterator.java | 6 +-
.../resettable/SpillingResettableIterator.java | 6 +-
...SpillingResettableMutableObjectIterator.java | 6 +-
.../sort/CombiningUnilateralSortMerger.java | 6 +-
.../operators/sort/MergeMatchIterator.java | 6 +-
.../operators/sort/UnilateralSortMerger.java | 6 +-
.../flink/runtime/profiling/ProfilingUtils.java | 23 +++++---
.../profiling/impl/EnvironmentListenerImpl.java | 6 +-
.../profiling/impl/JobManagerProfilerImpl.java | 6 +-
.../profiling/impl/TaskManagerProfilerImpl.java | 8 +--
.../apache/flink/runtime/taskmanager/Task.java | 6 +-
.../flink/runtime/taskmanager/TaskManager.java | 26 ++++-----
.../runtime/util/EnvironmentInformation.java | 17 ++----
.../org/apache/flink/runtime/util/IOUtils.java | 4 +-
.../apache/flink/runtime/AbstractIDTest.java | 1 -
.../flink/runtime/client/JobResultTest.java | 5 +-
.../ChannelDeploymentDescriptorTest.java | 1 -
.../GateDeploymentDescriptorTest.java | 2 -
.../TaskDeploymentDescriptorTest.java | 3 -
.../flink/runtime/event/job/JobEventTest.java | 3 -
.../runtime/event/job/ManagementEventTest.java | 4 --
.../task/EventNotificationManagerTest.java | 6 --
.../flink/runtime/event/task/TaskEventTest.java | 6 +-
.../executiongraph/ExecutionGraphTest.java | 16 -----
.../FileCacheDeleteValidationTest.java | 1 -
.../apache/flink/runtime/fs/LineReaderTest.java | 2 +-
.../instance/DefaultInstanceManagerTest.java | 8 ---
.../io/disk/iomanager/IOManagerITCase.java | 14 ++---
.../IOManagerPerformanceBenchmark.java | 6 +-
.../bufferprovider/LocalBufferPoolTest.java | 8 ---
.../runtime/jobmanager/JobManagerITCase.java | 44 --------------
.../runtime/operators/DataSinkTaskTest.java | 19 +++---
.../runtime/operators/DataSourceTaskTest.java | 4 +-
.../flink/runtime/operators/MapTaskTest.java | 8 +--
.../operators/ReduceTaskExternalITCase.java | 14 ++---
.../flink/runtime/operators/ReduceTaskTest.java | 14 ++---
.../operators/chaining/ChainTaskTest.java | 11 ----
.../hash/HashFunctionCollisionBenchmark.java | 12 ++--
.../runtime/operators/hash/HashTableITCase.java | 4 +-
.../hash/ReOpenableHashTableITCase.java | 3 -
.../CombiningUnilateralSortMergerITCase.java | 15 +----
.../operators/sort/ExternalSortITCase.java | 8 +--
.../sort/SortMergeCoGroupIteratorITCase.java | 1 -
.../operators/testutils/DriverTestBase.java | 12 ----
.../src/test/resources/logback-test.xml | 36 ++++++++++++
.../test/compiler/util/CompilerTestBase.java | 2 +-
.../flink/test/util/AbstractTestBase.java | 4 --
.../KMeansIterativeNepheleITCase.java | 4 --
.../test/cancelling/CancellingTestBase.java | 17 ++----
.../clients/examples/LocalExecutorITCase.java | 2 -
.../compiler/plandump/PreviewPlanDumpTest.java | 2 +-
.../DiffL1NormConvergenceCriterion.java | 6 +-
.../PackagedProgramEndToEndITCase.java | 5 --
.../apache/flink/test/operators/JoinITCase.java | 6 +-
.../apache/flink/test/operators/MapITCase.java | 6 +-
.../flink/test/operators/UnionITCase.java | 6 +-
.../operators/io/ContractITCaseIOFormats.java | 6 +-
.../DiffL1NormConvergenceCriterion.java | 6 +-
.../test/recordJobs/relational/TPCHQuery4.java | 7 ++-
.../test/recordJobs/relational/TPCHQuery9.java | 7 ++-
.../relational/query1Util/LineItemFilter.java | 9 +--
.../query1Util/LineItemFilterTest.java | 2 +-
.../test/runtime/NetworkStackThroughput.java | 9 +--
.../apache/flink/test/util/FailingTestBase.java | 9 ---
flink-tests/src/test/resources/logback-test.xml | 34 +++++++++++
pom.xml | 35 ++++-------
236 files changed, 1068 insertions(+), 1227 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/DEPENDENCIES
----------------------------------------------------------------------
diff --git a/DEPENDENCIES b/DEPENDENCIES
index 118a48e..5cf024e 100644
--- a/DEPENDENCIES
+++ b/DEPENDENCIES
@@ -17,7 +17,6 @@ under the Apache License (v 2.0):
- Apache Commons FileUpload (http://commons.apache.org/fileupload/)
- Apache Commons IO (http://commons.apache.org/io/)
- Apache Commons Math (http://commons.apache.org/proper/commons-math/)
- - Apache Log4J (http://logging.apache.org/log4j/1.2/)
- Apache Avro (http://avro.apache.org)
- Apache Hadoop (http://hadoop.apache.org)
- Apache Derby (http://db.apache.org/derby/)
@@ -43,6 +42,8 @@ The Apache Flink project depends on and/or bundles the following components
under the Eclipse Public License (v 1.0)
- JUnit (http://junit.org/)
+ - LOGback (http://logback.qos.ch)
+ Copyright (C) 1999-2012, QOS.ch. All rights reserved.
You may obtain a copy of the Eclipse Public License (v 1.0) at
https://www.eclipse.org/legal/epl-v10.html
@@ -188,14 +189,19 @@ Copyright (c) 2008 Alexander Beider & Stephen P. Morse.
-----------------------------------------------------------------------
- Apache Log4J
+ LOGBack
-----------------------------------------------------------------------
-ResolverUtil.java
-Copyright 2005-2006 Tim Fennell
+Copyright (C) 1999-2012, QOS.ch
-Dumbster SMTP test server
-Copyright 2004 Jason Paul Kitchen
+This program and the accompanying materials are dual-licensed under
+either the terms of the Eclipse Public License v1.0 as published by
+the Eclipse Foundation
+
+ or (per the licensee's choosing)
+
+under the terms of the GNU Lesser General Public License version 2.1
+as published by the Free Software Foundation.
-----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/docs/internal_logging.md
----------------------------------------------------------------------
diff --git a/docs/internal_logging.md b/docs/internal_logging.md
new file mode 100644
index 0000000..6d95360
--- /dev/null
+++ b/docs/internal_logging.md
@@ -0,0 +1,61 @@
+---
+title: "How to use logging"
+---
+
+The logging in Flink is implemented using the slf4j logging interface. As underlying logging framework, logback is used.
+
+## Configuring logback
+
+For users and developers alike it is important to control the logging framework.
+The configuration of the logging framework is exclusively done by configuration files.
+The configuration file either has to be specified by setting the environment property `-Dlogback.configurationFile=<file>` or by putting `logback.xml` in the classpath.
+The `conf` directory contains a `logback.xml` file which can be modified and is used if Flink is started outside of an IDE and with the provided starting scripts.
+The provided `logback.xml` has the following form:
+
+``` xml
+<configuration>
+ <appender name="file" class="ch.qos.logback.core.FileAppender">
+ <file>${log.file}</file>
+ <append>false</append>
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <root level="INFO">
+ <appender-ref ref="file"/>
+ </root>
+</configuration>
+```
+
+In order to control the logging level of `org.apache.flink.runtime.jobgraph.JobGraph`, for example, one would have to add the following line to the configuration file.
+``` xml
+<logger name="org.apache.flink.runtime.jobgraph.JobGraph" level="DEBUG"/>
+```
+
+For further information on configuring logback see [LOGback's manual](http://logback.qos.ch/manual/configuration.html).
+
+## Best practices for developers
+
+The loggers using slf4j are created by calling
+``` java
+import org.slf4j.LoggerFactory
+import org.slf4j.Logger
+
+Logger LOG = LoggerFactory.getLogger(Foobar.class)
+```
+
+In order to benefit most from slf4j, it is recommended to use its placeholder mechanism.
+Using placeholders allows to avoid unnecessary string constructions in case that the logging level is set so high that the message would not be logged.
+The syntax of placeholders is the following:
+``` java
+LOG.info("This message contains {} placeholders. {}", 2, "Yippie");
+```
+
+Placeholders can also be used in conjunction with exceptions which shall be logged.
+
+``` java
+catch(Exception exception){
+ LOG.error("An {} occurred.", "error", exception);
+}
+```
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/docs/internal_overview.md
----------------------------------------------------------------------
diff --git a/docs/internal_overview.md b/docs/internal_overview.md
index f3090d0..4c71c6e 100644
--- a/docs/internal_overview.md
+++ b/docs/internal_overview.md
@@ -39,3 +39,4 @@ or pull request that updates these documents as well.*
- [RPC and JobManager Communication](rpc_transfer.html)
-->
+- [How-to: Using logging in Flink](internal_logging.html)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
index 1031d81..a85eb22 100644
--- a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
+++ b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
@@ -27,8 +27,8 @@ import org.apache.avro.file.SeekableInput;
import org.apache.avro.io.DatumReader;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.flink.api.avro.FSDataInputStreamWrapper;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
@@ -43,7 +43,7 @@ public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultType
private static final long serialVersionUID = 1L;
- private static final Log LOG = LogFactory.getLog(AvroInputFormat.class);
+ private static final Logger LOG = LoggerFactory.getLogger(AvroInputFormat.class);
private final Class<E> avroValueType;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroInputFormat.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroInputFormat.java
index ab96895..2f31837 100644
--- a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroInputFormat.java
+++ b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroInputFormat.java
@@ -27,8 +27,8 @@ import org.apache.avro.file.SeekableInput;
import org.apache.avro.io.DatumReader;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.flink.api.avro.AvroBaseValue;
import org.apache.flink.api.avro.FSDataInputStreamWrapper;
import org.apache.flink.api.java.record.io.FileInputFormat;
@@ -42,7 +42,7 @@ public class AvroInputFormat<E> extends FileInputFormat {
private static final long serialVersionUID = 1L;
- private static final Log LOG = LogFactory.getLog(AvroInputFormat.class);
+ private static final Logger LOG = LoggerFactory.getLogger(AvroInputFormat.class);
private final Class<? extends AvroBaseValue<E>> avroWrapperTypeClass;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormat.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormat.java
index 1464ca9..be13114 100644
--- a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormat.java
+++ b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormat.java
@@ -32,8 +32,8 @@ import org.apache.avro.file.SeekableInput;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.flink.api.avro.FSDataInputStreamWrapper;
import org.apache.flink.api.java.record.io.FileInputFormat;
import org.apache.flink.core.fs.FileInputSplit;
@@ -64,7 +64,7 @@ import org.apache.flink.types.Value;
public class AvroRecordInputFormat extends FileInputFormat {
private static final long serialVersionUID = 1L;
- private static final Log LOG = LogFactory.getLog(AvroRecordInputFormat.class);
+ private static final Logger LOG = LoggerFactory.getLogger(AvroRecordInputFormat.class);
private FileReader<GenericRecord> dataFileReader;
private GenericRecord reuseAvroRecord = null;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
index 74a1e15..2e4b141 100644
--- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
+++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
@@ -25,7 +25,6 @@ import org.apache.flink.client.minicluster.NepheleMiniCluster;
import org.apache.flink.client.program.Client;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.LogUtils;
import org.junit.Assert;
import org.junit.Test;
@@ -36,10 +35,6 @@ public class AvroExternalJarProgramITCase {
private static final String TEST_DATA_FILE = "/testdata.avro";
- static {
- LogUtils.initializeDefaultTestConsoleLogger();
- }
-
@Test
public void testExternalProgram() {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
index 76b23ef..05ad284 100644
--- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
+++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
@@ -31,8 +31,6 @@ import java.util.Random;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.flink.api.avro.DataInputDecoder;
-import org.apache.flink.api.avro.DataOutputEncoder;
import org.apache.flink.api.java.record.io.avro.generated.Colors;
import org.apache.flink.api.java.record.io.avro.generated.User;
import org.apache.flink.util.StringUtils;
@@ -40,12 +38,10 @@ import org.junit.Test;
import static org.junit.Assert.*;
-
/**
* Tests the {@link DataOutputEncoder} and {@link DataInputDecoder} classes for Avro serialization.
*/
public class EncoderDecoderTest {
-
@Test
public void testComplexStringsDirecty() {
try {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-avro/src/test/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormatTest.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormatTest.java
index 64fae94..e0610f0 100644
--- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormatTest.java
+++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormatTest.java
@@ -29,7 +29,6 @@ import org.junit.Assert;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.specific.SpecificDatumWriter;
-import org.apache.flink.api.java.record.io.avro.AvroRecordInputFormat;
import org.apache.flink.api.java.record.io.avro.AvroRecordInputFormat.BooleanListValue;
import org.apache.flink.api.java.record.io.avro.AvroRecordInputFormat.LongMapValue;
import org.apache.flink.api.java.record.io.avro.AvroRecordInputFormat.StringListValue;
@@ -68,8 +67,7 @@ public class AvroRecordInputFormatTest {
final static long TEST_MAP_VALUE1 = 8546456L;
final static CharSequence TEST_MAP_KEY2 = "KEY 2";
final static long TEST_MAP_VALUE2 = 17554L;
-
-
+
@Before
public void createFiles() throws IOException {
testFile = File.createTempFile("AvroInputFormatTest", null);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-avro/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/resources/logback-test.xml b/flink-addons/flink-avro/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..8b3bb27
--- /dev/null
+++ b/flink-addons/flink-avro/src/test/resources/logback-test.xml
@@ -0,0 +1,29 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<configuration>
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <root level="WARN">
+ <appender-ref ref="STDOUT"/>
+ </root>
+</configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java
index 030d7f2..c224d64 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java
@@ -24,8 +24,8 @@ import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
@@ -51,7 +51,7 @@ public class HadoopInputFormat<K extends Writable, V extends Writable> implement
private static final long serialVersionUID = 1L;
- private static final Log LOG = LogFactory.getLog(HadoopInputFormat.class);
+ private static final Logger LOG = LoggerFactory.getLogger(HadoopInputFormat.class);
private org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat;
private Class<K> keyClass;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java
index cf12cae..de443c1 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java
@@ -25,8 +25,8 @@ import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.List;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
@@ -54,7 +54,7 @@ public class HadoopInputFormat<K extends Writable, V extends Writable> implement
private static final long serialVersionUID = 1L;
- private static final Log LOG = LogFactory.getLog(HadoopInputFormat.class);
+ private static final Logger LOG = LoggerFactory.getLogger(HadoopInputFormat.class);
private org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat;
private Class<K> keyClass;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-hadoop-compatibility/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/test/resources/logback-test.xml b/flink-addons/flink-hadoop-compatibility/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..8b3bb27
--- /dev/null
+++ b/flink-addons/flink-hadoop-compatibility/src/test/resources/logback-test.xml
@@ -0,0 +1,29 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<configuration>
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <root level="WARN">
+ <appender-ref ref="STDOUT"/>
+ </root>
+</configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
index 9ff5af7..fdf3c6c 100644
--- a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
+++ b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
@@ -23,8 +23,8 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.flink.addons.hbase.common.HBaseKey;
import org.apache.flink.addons.hbase.common.HBaseResult;
import org.apache.flink.addons.hbase.common.HBaseUtil;
@@ -51,7 +51,7 @@ public class TableInputFormat implements InputFormat<Record, TableInputSplit> {
private static final long serialVersionUID = 1L;
- private static final Log LOG = LogFactory.getLog(TableInputFormat.class);
+ private static final Logger LOG = LoggerFactory.getLogger(TableInputFormat.class);
/** A handle on an HBase table */
private HTable table;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java b/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
index ac8bc07..b17f658 100644
--- a/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
+++ b/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
@@ -26,8 +26,8 @@ import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.api.java.tuple.Tuple;
@@ -48,7 +48,7 @@ public class JDBCInputFormat<OUT extends Tuple> implements InputFormat<OUT, Inpu
private static final long serialVersionUID = 1L;
@SuppressWarnings("unused")
- private static final Log LOG = LogFactory.getLog(JDBCInputFormat.class);
+ private static final Logger LOG = LoggerFactory.getLogger(JDBCInputFormat.class);
private String username;
private String password;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java b/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
index 3a75480..9e08206 100644
--- a/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
+++ b/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
@@ -24,8 +24,8 @@ import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.Configuration;
@@ -42,7 +42,7 @@ public class JDBCOutputFormat<OUT extends Tuple> implements OutputFormat<OUT> {
private static final long serialVersionUID = 1L;
@SuppressWarnings("unused")
- private static final Log LOG = LogFactory.getLog(JDBCOutputFormat.class);
+ private static final Logger LOG = LoggerFactory.getLogger(JDBCOutputFormat.class);
private String username;
private String password;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormat.java b/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormat.java
index f2930f2..9a4d927 100644
--- a/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormat.java
+++ b/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormat.java
@@ -25,8 +25,8 @@ import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.io.NonParallelInput;
import org.apache.flink.api.java.record.io.GenericInputFormat;
import org.apache.flink.configuration.Configuration;
@@ -56,7 +56,7 @@ public class JDBCInputFormat extends GenericInputFormat implements NonParallelIn
private static final long serialVersionUID = 1L;
@SuppressWarnings("unused")
- private static final Log LOG = LogFactory.getLog(JDBCInputFormat.class);
+ private static final Logger LOG = LoggerFactory.getLogger(JDBCInputFormat.class);
public final String DRIVER_KEY = "driver";
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java b/flink-addons/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
index 41e9c5a..84895bf 100644
--- a/flink-addons/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
+++ b/flink-addons/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
@@ -26,7 +26,6 @@ import java.sql.Statement;
import org.junit.Assert;
-import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple5;
import org.junit.After;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java b/flink-addons/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
index a101a74..904c6d8 100644
--- a/flink-addons/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
+++ b/flink-addons/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
@@ -26,8 +26,6 @@ import java.sql.Statement;
import org.junit.Assert;
-import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
-import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.junit.After;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormatTest.java b/flink-addons/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormatTest.java
index 994eaf5..fd0764e 100644
--- a/flink-addons/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormatTest.java
+++ b/flink-addons/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormatTest.java
@@ -26,8 +26,6 @@ import java.sql.Statement;
import org.junit.Assert;
-import org.apache.flink.api.java.record.io.jdbc.JDBCInputFormat;
-import org.apache.flink.api.java.record.io.jdbc.JDBCOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.types.DoubleValue;
import org.apache.flink.types.FloatValue;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-jdbc/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-jdbc/src/test/resources/logback-test.xml b/flink-addons/flink-jdbc/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..8b3bb27
--- /dev/null
+++ b/flink-addons/flink-jdbc/src/test/resources/logback-test.xml
@@ -0,0 +1,29 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<configuration>
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <root level="WARN">
+ <appender-ref ref="STDOUT"/>
+ </root>
+</configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-spargel/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/test/resources/logback-test.xml b/flink-addons/flink-spargel/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..8b3bb27
--- /dev/null
+++ b/flink-addons/flink-spargel/src/test/resources/logback-test.xml
@@ -0,0 +1,29 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<configuration>
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <root level="WARN">
+ <appender-ref ref="STDOUT"/>
+ </root>
+</configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml b/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
index 2c61fc9..3974720 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
@@ -52,6 +52,16 @@ under the License.
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.0</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.sun.jmx</groupId>
+ <artifactId>jmxri</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jdmk</groupId>
+ <artifactId>jmxtools</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
index 5282704..c4618a7 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
@@ -17,8 +17,8 @@
package org.apache.flink.streaming.connectors.flume;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
import org.apache.flume.Event;
@@ -31,7 +31,7 @@ import org.apache.flume.event.EventBuilder;
public abstract class FlumeSink<IN> implements SinkFunction<IN> {
private static final long serialVersionUID = 1L;
- private static final Log LOG = LogFactory.getLog(FlumeSink.class);
+ private static final Logger LOG = LoggerFactory.getLogger(FlumeSink.class);
private transient FlinkRpcClientFacade client;
boolean initDone = false;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
index a809c7a..ae04298 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
@@ -19,8 +19,8 @@ package org.apache.flink.streaming.connectors.rabbitmq;
import java.io.IOException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
import com.rabbitmq.client.Channel;
@@ -30,7 +30,7 @@ import com.rabbitmq.client.ConnectionFactory;
public abstract class RMQSink<IN> implements SinkFunction<IN> {
private static final long serialVersionUID = 1L;
- private static final Log LOG = LogFactory.getLog(RMQSink.class);
+ private static final Logger LOG = LoggerFactory.getLogger(RMQSink.class);
private boolean sendAndClose = false;
private boolean closeWithoutSend = false;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
index 30c9097..1fcd57b 100755
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
@@ -1,4 +1,4 @@
-/**
+/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -12,15 +12,15 @@
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
- * limitations under the License.
+ * limitations under the License.
*/
package org.apache.flink.streaming.connectors.rabbitmq;
import java.io.IOException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import net.spy.memcached.compat.log.Logger;
+import net.spy.memcached.compat.log.LoggerFactory;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.function.source.RichSourceFunction;
import org.apache.flink.util.Collector;
@@ -33,7 +33,7 @@ import com.rabbitmq.client.QueueingConsumer;
public abstract class RMQSource<OUT> extends RichSourceFunction<OUT> {
private static final long serialVersionUID = 1L;
- private static final Log LOG = LogFactory.getLog(RMQSource.class);
+ private static final Logger LOG = LoggerFactory.getLogger(RMQSource.class);
private final String QUEUE_NAME;
private final String HOST_NAME;
@@ -86,9 +86,7 @@ public abstract class RMQSource<OUT> extends RichSourceFunction<OUT> {
try {
delivery = consumer.nextDelivery();
} catch (Exception e) {
- if (LOG.isErrorEnabled()) {
- LOG.error("Cannot receive RMQ message " + QUEUE_NAME + " at " + HOST_NAME);
- }
+ LOG.error("Cannot receive RMQ message " + QUEUE_NAME + " at " + HOST_NAME);
}
outTuple = deserialize(delivery.getBody());
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
index 8a1e29e..4aa7a43 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
@@ -25,10 +25,10 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.function.source.RichSourceFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.util.Collector;
@@ -46,7 +46,7 @@ import com.twitter.hbc.httpclient.auth.OAuth1;
*/
public class TwitterSource extends RichSourceFunction<String> {
- private static final Log LOG = LogFactory.getLog(TwitterSource.class);
+ private static final Logger LOG = LoggerFactory.getLogger(TwitterSource.class);
private static final long serialVersionUID = 1L;
private String authPath;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-streaming/flink-streaming-connectors/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/test/resources/logback-test.xml b/flink-addons/flink-streaming/flink-streaming-connectors/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..45b3b92
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/test/resources/logback-test.xml
@@ -0,0 +1,30 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<configuration>
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <root level="WARN">
+ <appender-ref ref="STDOUT"/>
+ </root>
+ <logger name="org.apache.flink.streaming" level="WARN"/>
+</configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
index 77c860b..aaa7161 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
@@ -22,8 +22,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.flink.runtime.io.network.channels.ChannelType;
import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
@@ -54,7 +54,7 @@ import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
*/
public class JobGraphBuilder {
- private static final Log LOG = LogFactory.getLog(JobGraphBuilder.class);
+ private static final Logger LOG = LoggerFactory.getLogger(JobGraphBuilder.class);
private final JobGraph jobGraph;
// Graph attributes
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
index 162c5df..54b1a98 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
@@ -1,4 +1,4 @@
-/**
+/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -12,21 +12,21 @@
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
- * limitations under the License.
+ * limitations under the License.
*/
package org.apache.flink.streaming.api.collector;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.runtime.io.network.api.RecordWriter;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.util.StringUtils;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.flink.runtime.io.network.api.RecordWriter;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A StreamCollector that uses user defined output names and a user defined
@@ -38,7 +38,7 @@ import org.apache.flink.util.StringUtils;
public class DirectedStreamCollector<OUT> extends StreamCollector<OUT> {
OutputSelector<OUT> outputSelector;
- private static final Log LOG = LogFactory.getLog(DirectedStreamCollector.class);
+ private static final Logger LOG = LoggerFactory.getLogger(DirectedStreamCollector.class);
private Set<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> emitted;
/**
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
index 65098d4..c6ba1ef 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
@@ -1,4 +1,4 @@
-/**
+/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -12,7 +12,7 @@
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
- * limitations under the License.
+ * limitations under the License.
*/
package org.apache.flink.streaming.api.collector;
@@ -22,13 +22,13 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.flink.runtime.io.network.api.RecordWriter;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.util.Collector;
import org.apache.flink.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Collector for tuples in Apache Flink stream processing. The collected values
@@ -40,7 +40,7 @@ import org.apache.flink.util.StringUtils;
*/
public class StreamCollector<OUT> implements Collector<OUT> {
- private static final Log LOG = LogFactory.getLog(StreamCollector.class);
+ private static final Logger LOG = LoggerFactory.getLogger(StreamCollector.class);
protected StreamRecord<OUT> streamRecord;
protected int channelID;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index 9169d65..b2fcf89 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -21,8 +21,8 @@ import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.flink.client.program.Client;
import org.apache.flink.client.program.JobWithJars;
@@ -32,7 +32,7 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.jobgraph.JobGraph;
public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
- private static final Log LOG = LogFactory.getLog(RemoteStreamEnvironment.class);
+ private static final Logger LOG = LoggerFactory.getLogger(RemoteStreamEnvironment.class);
private String host;
private int port;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamOperatorInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamOperatorInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamOperatorInvokable.java
index d92d1f0..799f647 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamOperatorInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamOperatorInvokable.java
@@ -17,14 +17,14 @@
package org.apache.flink.streaming.api.invokable;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
import org.apache.flink.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* The StreamOperatorInvokable represents the base class for all operators in
@@ -42,7 +42,7 @@ public abstract class StreamOperatorInvokable<IN, OUT> extends StreamInvokable<O
}
private static final long serialVersionUID = 1L;
- private static final Log LOG = LogFactory.getLog(StreamInvokable.class);
+ private static final Logger LOG = LoggerFactory.getLogger(StreamOperatorInvokable.class);
protected MutableObjectIterator<StreamRecord<IN>> recordIterator;
protected StreamRecordSerializer<IN> serializer;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
index 7b4f3ff..8a1a637 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
@@ -20,8 +20,8 @@ package org.apache.flink.streaming.api.streamcomponent;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.io.BlockingQueueBroker;
@@ -30,7 +30,7 @@ import org.apache.flink.util.StringUtils;
public class StreamIterationSink<IN extends Tuple> extends
AbstractStreamComponent {
- private static final Log LOG = LogFactory.getLog(StreamIterationSink.class);
+ private static final Logger LOG = LoggerFactory.getLogger(StreamIterationSink.class);
private InputHandler<IN> inputHandler;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
index 7de80b2..ab02d84 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
@@ -21,8 +21,8 @@ import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.runtime.io.network.api.RecordWriter;
import org.apache.flink.runtime.plugable.SerializationDelegate;
@@ -31,7 +31,7 @@ import org.apache.flink.streaming.io.BlockingQueueBroker;
public class StreamIterationSource<OUT extends Tuple> extends AbstractStreamComponent {
- private static final Log LOG = LogFactory.getLog(StreamIterationSource.class);
+ private static final Logger LOG = LoggerFactory.getLogger(StreamIterationSource.class);
private OutputHandler<OUT> outputHandler;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
index 145b709..0797cc1 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
@@ -17,13 +17,13 @@
package org.apache.flink.streaming.api.streamcomponent;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class StreamSink<IN> extends AbstractStreamComponent {
- private static final Log LOG = LogFactory.getLog(StreamSink.class);
+ private static final Logger LOG = LoggerFactory.getLogger(StreamSink.class);
private InputHandler<IN> inputHandler;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
index 50a78b3..0853ec7 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
@@ -19,17 +19,17 @@ package org.apache.flink.streaming.util;
import java.net.InetSocketAddress;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.flink.client.minicluster.NepheleMiniCluster;
import org.apache.flink.client.program.Client;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class ClusterUtil {
- private static final Log LOG = LogFactory.getLog(ClusterUtil.class);
+ private static final Logger LOG = LoggerFactory.getLogger(ClusterUtil.class);
public static final String CANNOT_EXECUTE_EMPTY_JOB = "Cannot execute empty job";
/**
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/LogUtils.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/LogUtils.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/LogUtils.java
deleted file mode 100644
index 82a6119..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/LogUtils.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import org.apache.log4j.ConsoleAppender;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.apache.log4j.PatternLayout;
-
-public class LogUtils {
-
- public static void initializeDefaultConsoleLogger() {
- initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
- }
-
- public static void initializeDefaultConsoleLogger(Level logLevel, Level rootLevel) {
- Logger logger = Logger.getLogger("org.apache.flink.streaming");
- logger.removeAllAppenders();
- logger.setAdditivity(false);
- PatternLayout layout = new PatternLayout();
- // layout.setConversionPattern("%highlight{%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n}");
- // TODO Add highlight
- layout.setConversionPattern("%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n");
- ConsoleAppender appender = new ConsoleAppender(layout, "System.err");
- logger.addAppender(appender);
- logger.setLevel(logLevel);
-
- Logger root = Logger.getRootLogger();
- root.removeAllAppenders();
- root.addAppender(appender);
- root.setLevel(rootLevel);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/TestDataUtil.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/TestDataUtil.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/TestDataUtil.java
index 9a19256..53eafaa 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/TestDataUtil.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/TestDataUtil.java
@@ -30,14 +30,14 @@ import java.net.URL;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.io.FileUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class TestDataUtil {
// TODO: Exception handling
// TODO: check checksum after download
- private static final Log LOG = LogFactory.getLog(TestDataUtil.class);
+ private static final Logger LOG = LoggerFactory.getLogger(TestDataUtil.class);
public static final String testDataDir = "src/test/resources/testdata/";
public static final String testRepoUrl = "http://info.ilab.sztaki.hu/~mbalassi/flink-streaming/testdata/";
public static final String testChekSumDir = "src/test/resources/testdata_checksum/";
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
index da9de05..83f7e8e 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
import org.apache.flink.util.Collector;
-import org.apache.flink.util.LogUtils;
import org.junit.Test;
public class IterateTest {
@@ -74,8 +73,6 @@ public class IterateTest {
@Test
public void test() throws Exception {
- LogUtils.initializeDefaultTestConsoleLogger();
-
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
env.setBufferTimeout(10);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
index 43c36a0..8baa99f 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
@@ -21,7 +21,6 @@ import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.util.LogUtils;
import org.junit.Test;
public class PrintTest{
@@ -48,8 +47,6 @@ public class PrintTest{
@Test
public void test() throws Exception {
- LogUtils.initializeDefaultTestConsoleLogger();
-
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
env.generateSequence(1, 10).map(new IdentityMap()).filter(new FilterAll()).print();
env.executeTest(MEMORYSIZE);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java
index 417ed85..7e87537 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java
@@ -32,7 +32,6 @@ import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.util.Collector;
-import org.apache.flink.util.LogUtils;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -133,9 +132,6 @@ public class WriteAsCsvTest {
@Test
public void test() throws Exception {
-
- LogUtils.initializeDefaultTestConsoleLogger();
-
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
@SuppressWarnings("unused")
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
index eea97ec..df970ae 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
@@ -31,7 +31,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.util.LogUtils;
import org.apache.flink.util.Collector;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -133,9 +132,6 @@ public class WriteAsTextTest {
@Test
public void test() throws Exception {
-
- LogUtils.initializeDefaultTestConsoleLogger();
-
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
@SuppressWarnings("unused")
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
index 1580235..1608b7b 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
@@ -29,8 +29,6 @@ import org.apache.flink.streaming.api.datastream.SplitDataStream;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
-import org.apache.flink.streaming.util.LogUtils;
-import org.apache.log4j.Level;
import org.junit.Test;
public class DirectedOutputTest {
@@ -90,7 +88,6 @@ public class DirectedOutputTest {
@SuppressWarnings("unused")
@Test
public void directOutputTest() throws Exception {
- LogUtils.initializeDefaultConsoleLogger(Level.OFF, Level.OFF);
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
SplitDataStream<Long> s = env.generateSequence(1, 6).split(new MySelector(),
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
index b31b39c..e7f12bd 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
@@ -29,10 +29,8 @@ import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.invokable.operator.co.CoFlatMapInvokable;
-import org.apache.flink.streaming.util.LogUtils;
import org.apache.flink.streaming.util.MockCoInvokable;
import org.apache.flink.util.Collector;
-import org.apache.log4j.Level;
import org.junit.Test;
public class CoFlatMapTest implements Serializable {
@@ -70,8 +68,6 @@ public class CoFlatMapTest implements Serializable {
@SuppressWarnings("unchecked")
@Test
public void multipleInputTest() {
- LogUtils.initializeDefaultConsoleLogger(Level.OFF, Level.OFF);
-
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
DataStream<Integer> ds1 = env.fromElements(1, 3, 5);