You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by JP de Vooght <jp...@vooght.de> on 2018/02/27 17:25:42 UTC

Re: logging question

Hello Nico,

took me a while to respond. Thank you for the comments. I had explored a
little more the docker-image and startup scripts. That allowed me to
better understand the log4j properties file used but I am still facing
this odd behavior.

I created a stackoverflow entry for this

https://stackoverflow.com/questions/48853497/docker-flink-not-showing-all-log-statements

Below, I am just showing the properties file below which I hadn't put on SO.

# This affects logging for both user code and Flink
log4j.rootLogger=INFO, file, console
 
# Uncomment this if you want to _only_ change Flink's logging
log4j.logger.org.apache.flink=OFF
 
# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to
manually
# change the log levels here.
log4j.logger.akka=INFO
log4j.logger.org.apache.kafka=INFO
log4j.logger.org.apache.hadoop=INFO
log4j.logger.org.apache.zookeeper=INFO
 
# Log all infos in the given file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.file=${log.file}
log4j.appender.file.append=false
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS}
%-5p %-60c %x - %m%n
 
# Log all infos to the console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.Target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd
HH:mm:ss,SSS} %-5p %-60c %x - %m%n
 
# Suppress the irrelevant (wrong) warnings
log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file
log4j.logger.org.apache.hadoop.util.NativeCodeLoader=OFF

JP


On 01/16/2018 10:50 AM, Nico Kruber wrote:
> Just a guess, but probably our logging initialisation changes the global
> log level (see conf/log4j.properties). DataStream.collect() executes the
> program along with creating a local Flink "cluster" (if you are testing
> locally / in an IDE) and initializing logging, among other things.
>
> Please comment the first line out and uncomment the following one to
> read like this:
> ==========
> # This affects logging for both user code and Flink
> #log4j.rootLogger=INFO, file
>
> # Uncomment this if you want to _only_ change Flink's logging
> log4j.logger.org.apache.flink=INFO
> ==========
>
>
> Nico
>
> On 13/01/18 13:52, jp@vooght.de wrote:
>> Hello,
>> I am learning Flink and using the docker image along with the AMIDST
>> library for this.
>> Below is a sample task from AMIDST which provides INFO output up until I
>> reach updateModel(). I pasted the short method as well and wonder what
>> prevents the Logger from
>>
>>         //Set-up Flink session
>>         env = ExecutionEnvironment.getExecutionEnvironment();
>>         env.getConfig().disableSysoutLogging();
>>         Logger LOG = LoggerFactory.getLogger(">>>>> ParallelMLExample");
>>
>>         //generate a random dataset
>>         DataFlink<DataInstance> dataFlink = new
>> DataSetGenerator().generate(env, 1234, 1000, 5, 0);
>>
>>         //Creates a DAG with the NaiveBayes structure for the random
>> dataset
>>         DAG dag =
>> DAGGenerator.getNaiveBayesStructure(dataFlink.getAttributes(),
>> "DiscreteVar4");
>>         LOG.info(dag.toString());
>>
>>         //Create the Learner object
>>         ParameterLearningAlgorithm learningAlgorithmFlink = new
>> ParallelMaximumLikelihood();
>>
>>         //Learning parameters
>>         learningAlgorithmFlink.setBatchSize(10);
>>         learningAlgorithmFlink.setDAG(dag);
>>
>>         //Initialize the learning process
>>         learningAlgorithmFlink.initLearning();
>>
>>         //Learn from the flink data
>>         LOG.info("BEFORE UPDATEMODEL");
>>         learningAlgorithmFlink.updateModel(dataFlink);
>>         LOG.info("AFTER UPDATEMODEL");
>>
>>         //Print the learnt Bayes Net
>>         BayesianNetwork bn =
>> learningAlgorithmFlink.getLearntBayesianNetwork();
>>         LOG.info(bn.toString());
>>
>>
>> Below is the updateModel method.
>>
>>     public double updateModel(DataFlink<DataInstance> dataUpdate) {
>>         try {
>>             Configuration config = new Configuration();
>>             config.setString(BN_NAME, this.dag.getName());
>>             config.setBytes(EFBN_NAME,
>> Serialization.serializeObject(efBayesianNetwork));
>>
>>             DataSet<DataInstance> dataset = dataUpdate.getDataSet();
>>             this.sumSS = dataset.map(new SufficientSatisticsMAP())
>>                     .withParameters(config)
>>                     .reduce(new SufficientSatisticsReduce())
>>                     .collect().get(0);
>>
>>             //Add the prior
>>             sumSS.sum(efBayesianNetwork.createInitSufficientStatistics());
>>
>>             JobExecutionResult result =
>> dataset.getExecutionEnvironment().getLastJobExecutionResult();
>>
>>             numInstances =
>> result.getAccumulatorResult(ParallelMaximumLikelihood.COUNTER_NAME+"_"+this.dag.getName());
>>
>>             numInstances++;//Initial counts
>>
>>         }catch(Exception ex){
>>             throw new UndeclaredThrowableException(ex);
>>         }
>>
>>         return this.getLogMarginalProbability();
>>     }
>>
>>
>> Not sure why LOG.info past that method are not output to the console.
>> TIA
>> JP


Re: logging question

Posted by JP de Vooght <jp...@vooght.de>.
in the docker-compose.yaml I have a volume entry which maps my
log4j.properties with /opt/flink/conf/log4j-console.properties

Not pretty but it works after I determined how it was being launched.
See below


version: "2.1"
 
services:
  jobmanager:
    image: flink
    expose:
      - "6123"
    ports:
      - "8081:8081"
    command: jobmanager
    volumes:
      -
/c/Users/XYZ/playground/flink/shared/log4j.properties:/opt/flink/conf/log4j-console.properties

    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager
      - FLINK_ENV_JAVA_OPTS=-Dlog.file=/opt/flink/log/jobmanager.log
 
  taskmanager:
    image: flink
    expose:
      - "6121"
      - "6122"
    depends_on:
      - jobmanager
    command: taskmanager
    links:
      - jobmanager:jobmanager
    volumes:
      -
/c/Users/XYZ/playground/flink/shared/log4j.properties:/opt/flink/conf/log4j-console.properties

    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager
      - FLINK_ENV_JAVA_OPTS=-Dlog.file=/opt/flink/log/taskmanager.log


On 02/28/2018 04:55 PM, Nico Kruber wrote:
> I'm a bit curious on how you hand your log4j into the docker image for
> consumption. On SO you are referring to bin/flink-console.sh but
> executing Flink in docker is a bit different.
> Maybe I'm wrong, but looking at the sources of the docker image [1], it
> will not forward any additional parameters to the docker container via
> additions to the command starting the docker image.
>
>
> Nico
>
> [1]
> https://github.com/docker-flink/docker-flink/tree/master/1.4/hadoop28-scala_2.11-alpine
>
> On 27/02/18 18:25, JP de Vooght wrote:
>> Hello Nico,
>>
>> took me a while to respond. Thank you for the comments. I had explored a
>> little more the docker-image and startup scripts. That allowed me to
>> better understand the log4j properties file used but I am still facing
>> this odd behavior.
>>
>> I created a stackoverflow entry for this
>>
>> https://stackoverflow.com/questions/48853497/docker-flink-not-showing-all-log-statements
>>
>> Below, I am just showing the properties file below which I hadn't put on SO.
>>
>> # This affects logging for both user code and Flink
>> log4j.rootLogger=INFO, file, console
>>  
>> # Uncomment this if you want to _only_ change Flink's logging
>> log4j.logger.org.apache.flink=OFF
>>  
>> # The following lines keep the log level of common libraries/connectors on
>> # log level INFO. The root logger does not override this. You have to
>> manually
>> # change the log levels here.
>> log4j.logger.akka=INFO
>> log4j.logger.org.apache.kafka=INFO
>> log4j.logger.org.apache.hadoop=INFO
>> log4j.logger.org.apache.zookeeper=INFO
>>  
>> # Log all infos in the given file
>> log4j.appender.file=org.apache.log4j.FileAppender
>> log4j.appender.file.file=${log.file}
>> log4j.appender.file.append=false
>> log4j.appender.file.layout=org.apache.log4j.PatternLayout
>> log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS}
>> %-5p %-60c %x - %m%n
>>  
>> # Log all infos to the console
>> log4j.appender.console=org.apache.log4j.ConsoleAppender
>> log4j.appender.console.Target=System.out
>> log4j.appender.console.layout=org.apache.log4j.PatternLayout
>> log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd
>> HH:mm:ss,SSS} %-5p %-60c %x - %m%n
>>  
>> # Suppress the irrelevant (wrong) warnings
>> log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file
>> log4j.logger.org.apache.hadoop.util.NativeCodeLoader=OFF
>>
>> JP
>>
>>
>> On 01/16/2018 10:50 AM, Nico Kruber wrote:
>>> Just a guess, but probably our logging initialisation changes the global
>>> log level (see conf/log4j.properties). DataStream.collect() executes the
>>> program along with creating a local Flink "cluster" (if you are testing
>>> locally / in an IDE) and initializing logging, among other things.
>>>
>>> Please comment the first line out and uncomment the following one to
>>> read like this:
>>> ==========
>>> # This affects logging for both user code and Flink
>>> #log4j.rootLogger=INFO, file
>>>
>>> # Uncomment this if you want to _only_ change Flink's logging
>>> log4j.logger.org.apache.flink=INFO
>>> ==========
>>>
>>>
>>> Nico
>>>
>>> On 13/01/18 13:52, jp@vooght.de wrote:
>>>> Hello,
>>>> I am learning Flink and using the docker image along with the AMIDST
>>>> library for this.
>>>> Below is a sample task from AMIDST which provides INFO output up until I
>>>> reach updateModel(). I pasted the short method as well and wonder what
>>>> prevents the Logger from
>>>>
>>>>         //Set-up Flink session
>>>>         env = ExecutionEnvironment.getExecutionEnvironment();
>>>>         env.getConfig().disableSysoutLogging();
>>>>         Logger LOG = LoggerFactory.getLogger(">>>>> ParallelMLExample");
>>>>
>>>>         //generate a random dataset
>>>>         DataFlink<DataInstance> dataFlink = new
>>>> DataSetGenerator().generate(env, 1234, 1000, 5, 0);
>>>>
>>>>         //Creates a DAG with the NaiveBayes structure for the random
>>>> dataset
>>>>         DAG dag =
>>>> DAGGenerator.getNaiveBayesStructure(dataFlink.getAttributes(),
>>>> "DiscreteVar4");
>>>>         LOG.info(dag.toString());
>>>>
>>>>         //Create the Learner object
>>>>         ParameterLearningAlgorithm learningAlgorithmFlink = new
>>>> ParallelMaximumLikelihood();
>>>>
>>>>         //Learning parameters
>>>>         learningAlgorithmFlink.setBatchSize(10);
>>>>         learningAlgorithmFlink.setDAG(dag);
>>>>
>>>>         //Initialize the learning process
>>>>         learningAlgorithmFlink.initLearning();
>>>>
>>>>         //Learn from the flink data
>>>>         LOG.info("BEFORE UPDATEMODEL");
>>>>         learningAlgorithmFlink.updateModel(dataFlink);
>>>>         LOG.info("AFTER UPDATEMODEL");
>>>>
>>>>         //Print the learnt Bayes Net
>>>>         BayesianNetwork bn =
>>>> learningAlgorithmFlink.getLearntBayesianNetwork();
>>>>         LOG.info(bn.toString());
>>>>
>>>>
>>>> Below is the updateModel method.
>>>>
>>>>     public double updateModel(DataFlink<DataInstance> dataUpdate) {
>>>>         try {
>>>>             Configuration config = new Configuration();
>>>>             config.setString(BN_NAME, this.dag.getName());
>>>>             config.setBytes(EFBN_NAME,
>>>> Serialization.serializeObject(efBayesianNetwork));
>>>>
>>>>             DataSet<DataInstance> dataset = dataUpdate.getDataSet();
>>>>             this.sumSS = dataset.map(new SufficientSatisticsMAP())
>>>>                     .withParameters(config)
>>>>                     .reduce(new SufficientSatisticsReduce())
>>>>                     .collect().get(0);
>>>>
>>>>             //Add the prior
>>>>             sumSS.sum(efBayesianNetwork.createInitSufficientStatistics());
>>>>
>>>>             JobExecutionResult result =
>>>> dataset.getExecutionEnvironment().getLastJobExecutionResult();
>>>>
>>>>             numInstances =
>>>> result.getAccumulatorResult(ParallelMaximumLikelihood.COUNTER_NAME+"_"+this.dag.getName());
>>>>
>>>>             numInstances++;//Initial counts
>>>>
>>>>         }catch(Exception ex){
>>>>             throw new UndeclaredThrowableException(ex);
>>>>         }
>>>>
>>>>         return this.getLogMarginalProbability();
>>>>     }
>>>>
>>>>
>>>> Not sure why LOG.info past that method are not output to the console.
>>>> TIA
>>>> JP


Re: logging question

Posted by Nico Kruber <ni...@data-artisans.com>.
I'm a bit curious on how you hand your log4j into the docker image for
consumption. On SO you are referring to bin/flink-console.sh but
executing Flink in docker is a bit different.
Maybe I'm wrong, but looking at the sources of the docker image [1], it
will not forward any additional parameters to the docker container via
additions to the command starting the docker image.


Nico

[1]
https://github.com/docker-flink/docker-flink/tree/master/1.4/hadoop28-scala_2.11-alpine

On 27/02/18 18:25, JP de Vooght wrote:
> Hello Nico,
> 
> took me a while to respond. Thank you for the comments. I had explored a
> little more the docker-image and startup scripts. That allowed me to
> better understand the log4j properties file used but I am still facing
> this odd behavior.
> 
> I created a stackoverflow entry for this
> 
> https://stackoverflow.com/questions/48853497/docker-flink-not-showing-all-log-statements
> 
> Below, I am just showing the properties file below which I hadn't put on SO.
> 
> # This affects logging for both user code and Flink
> log4j.rootLogger=INFO, file, console
>  
> # Uncomment this if you want to _only_ change Flink's logging
> log4j.logger.org.apache.flink=OFF
>  
> # The following lines keep the log level of common libraries/connectors on
> # log level INFO. The root logger does not override this. You have to
> manually
> # change the log levels here.
> log4j.logger.akka=INFO
> log4j.logger.org.apache.kafka=INFO
> log4j.logger.org.apache.hadoop=INFO
> log4j.logger.org.apache.zookeeper=INFO
>  
> # Log all infos in the given file
> log4j.appender.file=org.apache.log4j.FileAppender
> log4j.appender.file.file=${log.file}
> log4j.appender.file.append=false
> log4j.appender.file.layout=org.apache.log4j.PatternLayout
> log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS}
> %-5p %-60c %x - %m%n
>  
> # Log all infos to the console
> log4j.appender.console=org.apache.log4j.ConsoleAppender
> log4j.appender.console.Target=System.out
> log4j.appender.console.layout=org.apache.log4j.PatternLayout
> log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd
> HH:mm:ss,SSS} %-5p %-60c %x - %m%n
>  
> # Suppress the irrelevant (wrong) warnings
> log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file
> log4j.logger.org.apache.hadoop.util.NativeCodeLoader=OFF
> 
> JP
> 
> 
> On 01/16/2018 10:50 AM, Nico Kruber wrote:
>> Just a guess, but probably our logging initialisation changes the global
>> log level (see conf/log4j.properties). DataStream.collect() executes the
>> program along with creating a local Flink "cluster" (if you are testing
>> locally / in an IDE) and initializing logging, among other things.
>>
>> Please comment the first line out and uncomment the following one to
>> read like this:
>> ==========
>> # This affects logging for both user code and Flink
>> #log4j.rootLogger=INFO, file
>>
>> # Uncomment this if you want to _only_ change Flink's logging
>> log4j.logger.org.apache.flink=INFO
>> ==========
>>
>>
>> Nico
>>
>> On 13/01/18 13:52, jp@vooght.de wrote:
>>> Hello,
>>> I am learning Flink and using the docker image along with the AMIDST
>>> library for this.
>>> Below is a sample task from AMIDST which provides INFO output up until I
>>> reach updateModel(). I pasted the short method as well and wonder what
>>> prevents the Logger from
>>>
>>>         //Set-up Flink session
>>>         env = ExecutionEnvironment.getExecutionEnvironment();
>>>         env.getConfig().disableSysoutLogging();
>>>         Logger LOG = LoggerFactory.getLogger(">>>>> ParallelMLExample");
>>>
>>>         //generate a random dataset
>>>         DataFlink<DataInstance> dataFlink = new
>>> DataSetGenerator().generate(env, 1234, 1000, 5, 0);
>>>
>>>         //Creates a DAG with the NaiveBayes structure for the random
>>> dataset
>>>         DAG dag =
>>> DAGGenerator.getNaiveBayesStructure(dataFlink.getAttributes(),
>>> "DiscreteVar4");
>>>         LOG.info(dag.toString());
>>>
>>>         //Create the Learner object
>>>         ParameterLearningAlgorithm learningAlgorithmFlink = new
>>> ParallelMaximumLikelihood();
>>>
>>>         //Learning parameters
>>>         learningAlgorithmFlink.setBatchSize(10);
>>>         learningAlgorithmFlink.setDAG(dag);
>>>
>>>         //Initialize the learning process
>>>         learningAlgorithmFlink.initLearning();
>>>
>>>         //Learn from the flink data
>>>         LOG.info("BEFORE UPDATEMODEL");
>>>         learningAlgorithmFlink.updateModel(dataFlink);
>>>         LOG.info("AFTER UPDATEMODEL");
>>>
>>>         //Print the learnt Bayes Net
>>>         BayesianNetwork bn =
>>> learningAlgorithmFlink.getLearntBayesianNetwork();
>>>         LOG.info(bn.toString());
>>>
>>>
>>> Below is the updateModel method.
>>>
>>>     public double updateModel(DataFlink<DataInstance> dataUpdate) {
>>>         try {
>>>             Configuration config = new Configuration();
>>>             config.setString(BN_NAME, this.dag.getName());
>>>             config.setBytes(EFBN_NAME,
>>> Serialization.serializeObject(efBayesianNetwork));
>>>
>>>             DataSet<DataInstance> dataset = dataUpdate.getDataSet();
>>>             this.sumSS = dataset.map(new SufficientSatisticsMAP())
>>>                     .withParameters(config)
>>>                     .reduce(new SufficientSatisticsReduce())
>>>                     .collect().get(0);
>>>
>>>             //Add the prior
>>>             sumSS.sum(efBayesianNetwork.createInitSufficientStatistics());
>>>
>>>             JobExecutionResult result =
>>> dataset.getExecutionEnvironment().getLastJobExecutionResult();
>>>
>>>             numInstances =
>>> result.getAccumulatorResult(ParallelMaximumLikelihood.COUNTER_NAME+"_"+this.dag.getName());
>>>
>>>             numInstances++;//Initial counts
>>>
>>>         }catch(Exception ex){
>>>             throw new UndeclaredThrowableException(ex);
>>>         }
>>>
>>>         return this.getLogMarginalProbability();
>>>     }
>>>
>>>
>>> Not sure why LOG.info past that method are not output to the console.
>>> TIA
>>> JP
>