You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by JP de Vooght <jp...@vooght.de> on 2018/02/27 17:25:42 UTC
Re: logging question
Hello Nico,
took me a while to respond. Thank you for the comments. I had explored a
little more the docker-image and startup scripts. That allowed me to
better understand the log4j properties file used but I am still facing
this odd behavior.
I created a stackoverflow entry for this
https://stackoverflow.com/questions/48853497/docker-flink-not-showing-all-log-statements
Below, I am just showing the properties file below which I hadn't put on SO.
# This affects logging for both user code and Flink
log4j.rootLogger=INFO, file, console
# Uncomment this if you want to _only_ change Flink's logging
log4j.logger.org.apache.flink=OFF
# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to
manually
# change the log levels here.
log4j.logger.akka=INFO
log4j.logger.org.apache.kafka=INFO
log4j.logger.org.apache.hadoop=INFO
log4j.logger.org.apache.zookeeper=INFO
# Log all infos in the given file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.file=${log.file}
log4j.appender.file.append=false
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS}
%-5p %-60c %x - %m%n
# Log all infos to the console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.Target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd
HH:mm:ss,SSS} %-5p %-60c %x - %m%n
# Suppress the irrelevant (wrong) warnings
log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file
log4j.logger.org.apache.hadoop.util.NativeCodeLoader=OFF
JP
On 01/16/2018 10:50 AM, Nico Kruber wrote:
> Just a guess, but probably our logging initialisation changes the global
> log level (see conf/log4j.properties). DataStream.collect() executes the
> program along with creating a local Flink "cluster" (if you are testing
> locally / in an IDE) and initializing logging, among other things.
>
> Please comment the first line out and uncomment the following one to
> read like this:
> ==========
> # This affects logging for both user code and Flink
> #log4j.rootLogger=INFO, file
>
> # Uncomment this if you want to _only_ change Flink's logging
> log4j.logger.org.apache.flink=INFO
> ==========
>
>
> Nico
>
> On 13/01/18 13:52, jp@vooght.de wrote:
>> Hello,
>> I am learning Flink and using the docker image along with the AMIDST
>> library for this.
>> Below is a sample task from AMIDST which provides INFO output up until I
>> reach updateModel(). I pasted the short method as well and wonder what
>> prevents the Logger from
>>
>> //Set-up Flink session
>> env = ExecutionEnvironment.getExecutionEnvironment();
>> env.getConfig().disableSysoutLogging();
>> Logger LOG = LoggerFactory.getLogger(">>>>> ParallelMLExample");
>>
>> //generate a random dataset
>> DataFlink<DataInstance> dataFlink = new
>> DataSetGenerator().generate(env, 1234, 1000, 5, 0);
>>
>> //Creates a DAG with the NaiveBayes structure for the random
>> dataset
>> DAG dag =
>> DAGGenerator.getNaiveBayesStructure(dataFlink.getAttributes(),
>> "DiscreteVar4");
>> LOG.info(dag.toString());
>>
>> //Create the Learner object
>> ParameterLearningAlgorithm learningAlgorithmFlink = new
>> ParallelMaximumLikelihood();
>>
>> //Learning parameters
>> learningAlgorithmFlink.setBatchSize(10);
>> learningAlgorithmFlink.setDAG(dag);
>>
>> //Initialize the learning process
>> learningAlgorithmFlink.initLearning();
>>
>> //Learn from the flink data
>> LOG.info("BEFORE UPDATEMODEL");
>> learningAlgorithmFlink.updateModel(dataFlink);
>> LOG.info("AFTER UPDATEMODEL");
>>
>> //Print the learnt Bayes Net
>> BayesianNetwork bn =
>> learningAlgorithmFlink.getLearntBayesianNetwork();
>> LOG.info(bn.toString());
>>
>>
>> Below is the updateModel method.
>>
>> public double updateModel(DataFlink<DataInstance> dataUpdate) {
>> try {
>> Configuration config = new Configuration();
>> config.setString(BN_NAME, this.dag.getName());
>> config.setBytes(EFBN_NAME,
>> Serialization.serializeObject(efBayesianNetwork));
>>
>> DataSet<DataInstance> dataset = dataUpdate.getDataSet();
>> this.sumSS = dataset.map(new SufficientSatisticsMAP())
>> .withParameters(config)
>> .reduce(new SufficientSatisticsReduce())
>> .collect().get(0);
>>
>> //Add the prior
>> sumSS.sum(efBayesianNetwork.createInitSufficientStatistics());
>>
>> JobExecutionResult result =
>> dataset.getExecutionEnvironment().getLastJobExecutionResult();
>>
>> numInstances =
>> result.getAccumulatorResult(ParallelMaximumLikelihood.COUNTER_NAME+"_"+this.dag.getName());
>>
>> numInstances++;//Initial counts
>>
>> }catch(Exception ex){
>> throw new UndeclaredThrowableException(ex);
>> }
>>
>> return this.getLogMarginalProbability();
>> }
>>
>>
>> Not sure why LOG.info past that method are not output to the console.
>> TIA
>> JP
Re: logging question
Posted by JP de Vooght <jp...@vooght.de>.
in the docker-compose.yaml I have a volume entry which maps my
log4j.properties with /opt/flink/conf/log4j-console.properties
Not pretty but it works after I determined how it was being launched.
See below
version: "2.1"
services:
jobmanager:
image: flink
expose:
- "6123"
ports:
- "8081:8081"
command: jobmanager
volumes:
-
/c/Users/XYZ/playground/flink/shared/log4j.properties:/opt/flink/conf/log4j-console.properties
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
- FLINK_ENV_JAVA_OPTS=-Dlog.file=/opt/flink/log/jobmanager.log
taskmanager:
image: flink
expose:
- "6121"
- "6122"
depends_on:
- jobmanager
command: taskmanager
links:
- jobmanager:jobmanager
volumes:
-
/c/Users/XYZ/playground/flink/shared/log4j.properties:/opt/flink/conf/log4j-console.properties
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
- FLINK_ENV_JAVA_OPTS=-Dlog.file=/opt/flink/log/taskmanager.log
On 02/28/2018 04:55 PM, Nico Kruber wrote:
> I'm a bit curious on how you hand your log4j into the docker image for
> consumption. On SO you are referring to bin/flink-console.sh but
> executing Flink in docker is a bit different.
> Maybe I'm wrong, but looking at the sources of the docker image [1], it
> will not forward any additional parameters to the docker container via
> additions to the command starting the docker image.
>
>
> Nico
>
> [1]
> https://github.com/docker-flink/docker-flink/tree/master/1.4/hadoop28-scala_2.11-alpine
>
> On 27/02/18 18:25, JP de Vooght wrote:
>> Hello Nico,
>>
>> took me a while to respond. Thank you for the comments. I had explored a
>> little more the docker-image and startup scripts. That allowed me to
>> better understand the log4j properties file used but I am still facing
>> this odd behavior.
>>
>> I created a stackoverflow entry for this
>>
>> https://stackoverflow.com/questions/48853497/docker-flink-not-showing-all-log-statements
>>
>> Below, I am just showing the properties file below which I hadn't put on SO.
>>
>> # This affects logging for both user code and Flink
>> log4j.rootLogger=INFO, file, console
>>
>> # Uncomment this if you want to _only_ change Flink's logging
>> log4j.logger.org.apache.flink=OFF
>>
>> # The following lines keep the log level of common libraries/connectors on
>> # log level INFO. The root logger does not override this. You have to
>> manually
>> # change the log levels here.
>> log4j.logger.akka=INFO
>> log4j.logger.org.apache.kafka=INFO
>> log4j.logger.org.apache.hadoop=INFO
>> log4j.logger.org.apache.zookeeper=INFO
>>
>> # Log all infos in the given file
>> log4j.appender.file=org.apache.log4j.FileAppender
>> log4j.appender.file.file=${log.file}
>> log4j.appender.file.append=false
>> log4j.appender.file.layout=org.apache.log4j.PatternLayout
>> log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS}
>> %-5p %-60c %x - %m%n
>>
>> # Log all infos to the console
>> log4j.appender.console=org.apache.log4j.ConsoleAppender
>> log4j.appender.console.Target=System.out
>> log4j.appender.console.layout=org.apache.log4j.PatternLayout
>> log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd
>> HH:mm:ss,SSS} %-5p %-60c %x - %m%n
>>
>> # Suppress the irrelevant (wrong) warnings
>> log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file
>> log4j.logger.org.apache.hadoop.util.NativeCodeLoader=OFF
>>
>> JP
>>
>>
>> On 01/16/2018 10:50 AM, Nico Kruber wrote:
>>> Just a guess, but probably our logging initialisation changes the global
>>> log level (see conf/log4j.properties). DataStream.collect() executes the
>>> program along with creating a local Flink "cluster" (if you are testing
>>> locally / in an IDE) and initializing logging, among other things.
>>>
>>> Please comment the first line out and uncomment the following one to
>>> read like this:
>>> ==========
>>> # This affects logging for both user code and Flink
>>> #log4j.rootLogger=INFO, file
>>>
>>> # Uncomment this if you want to _only_ change Flink's logging
>>> log4j.logger.org.apache.flink=INFO
>>> ==========
>>>
>>>
>>> Nico
>>>
>>> On 13/01/18 13:52, jp@vooght.de wrote:
>>>> Hello,
>>>> I am learning Flink and using the docker image along with the AMIDST
>>>> library for this.
>>>> Below is a sample task from AMIDST which provides INFO output up until I
>>>> reach updateModel(). I pasted the short method as well and wonder what
>>>> prevents the Logger from
>>>>
>>>> //Set-up Flink session
>>>> env = ExecutionEnvironment.getExecutionEnvironment();
>>>> env.getConfig().disableSysoutLogging();
>>>> Logger LOG = LoggerFactory.getLogger(">>>>> ParallelMLExample");
>>>>
>>>> //generate a random dataset
>>>> DataFlink<DataInstance> dataFlink = new
>>>> DataSetGenerator().generate(env, 1234, 1000, 5, 0);
>>>>
>>>> //Creates a DAG with the NaiveBayes structure for the random
>>>> dataset
>>>> DAG dag =
>>>> DAGGenerator.getNaiveBayesStructure(dataFlink.getAttributes(),
>>>> "DiscreteVar4");
>>>> LOG.info(dag.toString());
>>>>
>>>> //Create the Learner object
>>>> ParameterLearningAlgorithm learningAlgorithmFlink = new
>>>> ParallelMaximumLikelihood();
>>>>
>>>> //Learning parameters
>>>> learningAlgorithmFlink.setBatchSize(10);
>>>> learningAlgorithmFlink.setDAG(dag);
>>>>
>>>> //Initialize the learning process
>>>> learningAlgorithmFlink.initLearning();
>>>>
>>>> //Learn from the flink data
>>>> LOG.info("BEFORE UPDATEMODEL");
>>>> learningAlgorithmFlink.updateModel(dataFlink);
>>>> LOG.info("AFTER UPDATEMODEL");
>>>>
>>>> //Print the learnt Bayes Net
>>>> BayesianNetwork bn =
>>>> learningAlgorithmFlink.getLearntBayesianNetwork();
>>>> LOG.info(bn.toString());
>>>>
>>>>
>>>> Below is the updateModel method.
>>>>
>>>> public double updateModel(DataFlink<DataInstance> dataUpdate) {
>>>> try {
>>>> Configuration config = new Configuration();
>>>> config.setString(BN_NAME, this.dag.getName());
>>>> config.setBytes(EFBN_NAME,
>>>> Serialization.serializeObject(efBayesianNetwork));
>>>>
>>>> DataSet<DataInstance> dataset = dataUpdate.getDataSet();
>>>> this.sumSS = dataset.map(new SufficientSatisticsMAP())
>>>> .withParameters(config)
>>>> .reduce(new SufficientSatisticsReduce())
>>>> .collect().get(0);
>>>>
>>>> //Add the prior
>>>> sumSS.sum(efBayesianNetwork.createInitSufficientStatistics());
>>>>
>>>> JobExecutionResult result =
>>>> dataset.getExecutionEnvironment().getLastJobExecutionResult();
>>>>
>>>> numInstances =
>>>> result.getAccumulatorResult(ParallelMaximumLikelihood.COUNTER_NAME+"_"+this.dag.getName());
>>>>
>>>> numInstances++;//Initial counts
>>>>
>>>> }catch(Exception ex){
>>>> throw new UndeclaredThrowableException(ex);
>>>> }
>>>>
>>>> return this.getLogMarginalProbability();
>>>> }
>>>>
>>>>
>>>> Not sure why LOG.info past that method are not output to the console.
>>>> TIA
>>>> JP
Re: logging question
Posted by Nico Kruber <ni...@data-artisans.com>.
I'm a bit curious on how you hand your log4j into the docker image for
consumption. On SO you are referring to bin/flink-console.sh but
executing Flink in docker is a bit different.
Maybe I'm wrong, but looking at the sources of the docker image [1], it
will not forward any additional parameters to the docker container via
additions to the command starting the docker image.
Nico
[1]
https://github.com/docker-flink/docker-flink/tree/master/1.4/hadoop28-scala_2.11-alpine
On 27/02/18 18:25, JP de Vooght wrote:
> Hello Nico,
>
> took me a while to respond. Thank you for the comments. I had explored a
> little more the docker-image and startup scripts. That allowed me to
> better understand the log4j properties file used but I am still facing
> this odd behavior.
>
> I created a stackoverflow entry for this
>
> https://stackoverflow.com/questions/48853497/docker-flink-not-showing-all-log-statements
>
> Below, I am just showing the properties file below which I hadn't put on SO.
>
> # This affects logging for both user code and Flink
> log4j.rootLogger=INFO, file, console
>
> # Uncomment this if you want to _only_ change Flink's logging
> log4j.logger.org.apache.flink=OFF
>
> # The following lines keep the log level of common libraries/connectors on
> # log level INFO. The root logger does not override this. You have to
> manually
> # change the log levels here.
> log4j.logger.akka=INFO
> log4j.logger.org.apache.kafka=INFO
> log4j.logger.org.apache.hadoop=INFO
> log4j.logger.org.apache.zookeeper=INFO
>
> # Log all infos in the given file
> log4j.appender.file=org.apache.log4j.FileAppender
> log4j.appender.file.file=${log.file}
> log4j.appender.file.append=false
> log4j.appender.file.layout=org.apache.log4j.PatternLayout
> log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS}
> %-5p %-60c %x - %m%n
>
> # Log all infos to the console
> log4j.appender.console=org.apache.log4j.ConsoleAppender
> log4j.appender.console.Target=System.out
> log4j.appender.console.layout=org.apache.log4j.PatternLayout
> log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd
> HH:mm:ss,SSS} %-5p %-60c %x - %m%n
>
> # Suppress the irrelevant (wrong) warnings
> log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file
> log4j.logger.org.apache.hadoop.util.NativeCodeLoader=OFF
>
> JP
>
>
> On 01/16/2018 10:50 AM, Nico Kruber wrote:
>> Just a guess, but probably our logging initialisation changes the global
>> log level (see conf/log4j.properties). DataStream.collect() executes the
>> program along with creating a local Flink "cluster" (if you are testing
>> locally / in an IDE) and initializing logging, among other things.
>>
>> Please comment the first line out and uncomment the following one to
>> read like this:
>> ==========
>> # This affects logging for both user code and Flink
>> #log4j.rootLogger=INFO, file
>>
>> # Uncomment this if you want to _only_ change Flink's logging
>> log4j.logger.org.apache.flink=INFO
>> ==========
>>
>>
>> Nico
>>
>> On 13/01/18 13:52, jp@vooght.de wrote:
>>> Hello,
>>> I am learning Flink and using the docker image along with the AMIDST
>>> library for this.
>>> Below is a sample task from AMIDST which provides INFO output up until I
>>> reach updateModel(). I pasted the short method as well and wonder what
>>> prevents the Logger from
>>>
>>> //Set-up Flink session
>>> env = ExecutionEnvironment.getExecutionEnvironment();
>>> env.getConfig().disableSysoutLogging();
>>> Logger LOG = LoggerFactory.getLogger(">>>>> ParallelMLExample");
>>>
>>> //generate a random dataset
>>> DataFlink<DataInstance> dataFlink = new
>>> DataSetGenerator().generate(env, 1234, 1000, 5, 0);
>>>
>>> //Creates a DAG with the NaiveBayes structure for the random
>>> dataset
>>> DAG dag =
>>> DAGGenerator.getNaiveBayesStructure(dataFlink.getAttributes(),
>>> "DiscreteVar4");
>>> LOG.info(dag.toString());
>>>
>>> //Create the Learner object
>>> ParameterLearningAlgorithm learningAlgorithmFlink = new
>>> ParallelMaximumLikelihood();
>>>
>>> //Learning parameters
>>> learningAlgorithmFlink.setBatchSize(10);
>>> learningAlgorithmFlink.setDAG(dag);
>>>
>>> //Initialize the learning process
>>> learningAlgorithmFlink.initLearning();
>>>
>>> //Learn from the flink data
>>> LOG.info("BEFORE UPDATEMODEL");
>>> learningAlgorithmFlink.updateModel(dataFlink);
>>> LOG.info("AFTER UPDATEMODEL");
>>>
>>> //Print the learnt Bayes Net
>>> BayesianNetwork bn =
>>> learningAlgorithmFlink.getLearntBayesianNetwork();
>>> LOG.info(bn.toString());
>>>
>>>
>>> Below is the updateModel method.
>>>
>>> public double updateModel(DataFlink<DataInstance> dataUpdate) {
>>> try {
>>> Configuration config = new Configuration();
>>> config.setString(BN_NAME, this.dag.getName());
>>> config.setBytes(EFBN_NAME,
>>> Serialization.serializeObject(efBayesianNetwork));
>>>
>>> DataSet<DataInstance> dataset = dataUpdate.getDataSet();
>>> this.sumSS = dataset.map(new SufficientSatisticsMAP())
>>> .withParameters(config)
>>> .reduce(new SufficientSatisticsReduce())
>>> .collect().get(0);
>>>
>>> //Add the prior
>>> sumSS.sum(efBayesianNetwork.createInitSufficientStatistics());
>>>
>>> JobExecutionResult result =
>>> dataset.getExecutionEnvironment().getLastJobExecutionResult();
>>>
>>> numInstances =
>>> result.getAccumulatorResult(ParallelMaximumLikelihood.COUNTER_NAME+"_"+this.dag.getName());
>>>
>>> numInstances++;//Initial counts
>>>
>>> }catch(Exception ex){
>>> throw new UndeclaredThrowableException(ex);
>>> }
>>>
>>> return this.getLogMarginalProbability();
>>> }
>>>
>>>
>>> Not sure why LOG.info past that method are not output to the console.
>>> TIA
>>> JP
>