You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by JP de Vooght <jp...@vooght.de> on 2018/05/24 15:55:58 UTC

Re: logging question

I finally sorted my problem out. Using the CLI instead of the Web UI for
debugging with simple System.out.println() statements. I noticed that a
local installation launched with start-cluster.sh worked A-OK.

In order to reproduce a simple learning environment with docker images,
I ended up creating a my own image without that entrypoint.sh and
ensuring the CLI would run from an image on the same bridge network...
That image also includes some opt libraries. That's about it really.

docker run --rm -t --net "amidst_default" -v
/home/jdevoo/playground/amidst/target:/opt/flink/target flink flink run
-m jobmanager:6123 target/ParallelMLExample-0.0.1.jar

docker-compose.yaml is below...
version: "2.0"
services:
  jobmanager:
    image: flink
    volumes:
      - /home/jdevoo/playground/amidst/conf:/opt/flink/conf
    ports:
      - "8081:8081"
    command: jobmanager.sh start-foreground cluster
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager

  taskmanager:
    image: flink
    volumes:
      - /home/jdevoo/playground/amidst/conf:/opt/flink/conf
    depends_on:
      - jobmanager
    command: taskmanager.sh start-foreground
    depends_on:
      - jobmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager

Going to enjoy some AMIDST solutions to ML problems now :-)

JP


On 04/07/2018 06:31 PM, JP de Vooght wrote:
> Just to recap
>
> I use Flink 1.4.2 with Docker compose which launches a jobmanager and
> a taskmanager.
> My hope is to learn another library which can be used with Flink, so
> logging is important to me.
> I start the cluster and deploy the following task (I dropped all calls
> to that library so I can focus on plain Flink and docker)
>
> public class ParallelMLExample {
>     private static final Logger LOG =
> LoggerFactory.getLogger(ParallelMLExample.class);
>
>     public static void main(String[] args) throws Exception {
>         final ExecutionEnvironment env;
>
>         //Set-up Flink session
>         env = ExecutionEnvironment.getExecutionEnvironment();
>         env.getConfig().disableSysoutLogging();
>
>         DataSet<Integer> amounts = env.fromElements(1, 29, 40, 50);
>
>         LOG.info("########## BEFORE UPDATEMODEL ##########");
>         List<Integer> collect = amounts.filter(a -> a >
> 30).reduce((integer, t1) -> integer + t1).collect();
>         LOG.info("########## AFTER UPDATEMODEL ##########");
>
>         LOG.info(collect.get(0).toString());
>     }
> }
>
> Log output of jobmanager does not show anything after "BEFORE UPDATE
> MODEL"
>
> $ docker-compose up
> Starting flink_jobmanager_1
> Starting flink_taskmanager_1
> Attaching to flink_jobmanager_1, flink_taskmanager_1
> jobmanager_1   | Starting Job Manager
> jobmanager_1   | config file:
> jobmanager_1   | jobmanager.rpc.address: jobmanager
> jobmanager_1   | jobmanager.rpc.port: 6123
> jobmanager_1   | jobmanager.heap.mb: 1024
> jobmanager_1   | taskmanager.heap.mb: 1024
> jobmanager_1   | taskmanager.numberOfTaskSlots: 1
> jobmanager_1   | taskmanager.memory.preallocate: false
> jobmanager_1   | parallelism.default: 1
> jobmanager_1   | web.port: 8081
> jobmanager_1   | blob.server.port: 6124
> jobmanager_1   | query.server.port: 6125
> jobmanager_1   | blob.server.port: 6124
> jobmanager_1   | query.server.port: 6125
> jobmanager_1   | blob.server.port: 6124
> jobmanager_1   | query.server.port: 6125
> taskmanager_1  | Starting Task Manager
> taskmanager_1  | config file:
> taskmanager_1  | jobmanager.rpc.address: jobmanager
> taskmanager_1  | jobmanager.rpc.port: 6123
> taskmanager_1  | jobmanager.heap.mb: 1024
> taskmanager_1  | taskmanager.heap.mb: 1024
> taskmanager_1  | taskmanager.numberOfTaskSlots: 4
> taskmanager_1  | taskmanager.memory.preallocate: false
> taskmanager_1  | parallelism.default: 1
> taskmanager_1  | web.port: 8081
> taskmanager_1  | blob.server.port: 6124
> taskmanager_1  | query.server.port: 6125
> taskmanager_1  | blob.server.port: 6124
> taskmanager_1  | query.server.port: 6125
> taskmanager_1  | blob.server.port: 6124
> taskmanager_1  | query.server.port: 6125
> jobmanager_1   | Starting jobmanager as a console application on host
> e207d6ad4a1a.
> taskmanager_1  | Starting taskmanager as a console application on host
> 1d724ce8ae5e.
> jobmanager_1   | Slf4jLogger started
> taskmanager_1  | Slf4jLogger started
> taskmanager_1  | Could not load Queryable State Client Proxy. Probable
> reason: flink-queryable-state-runtime is not in the classpath. To
> enable Queryable State, please move the flink-queryable-state-runtime
> jar from the opt to the lib folder.
> taskmanager_1  | Could not load Queryable State Server. Probable
> reason: flink-queryable-state-runtime is not in the classpath. To
> enable Queryable State, please move the flink-queryable-state-runtime
> jar from the opt to the lib folder.
> jobmanager_1   | ########## BEFORE UPDATEMODEL ##########
> taskmanager_1  | The operator name DataSource (at
> main(ParallelMLExample.java:30)
> (org.apache.flink.api.java.io.CollectionInputFormat)) exceeded the 80
> characters length limit and was truncated.
>
>
>
> On 04/07/2018 02:46 PM, JP de Vooght wrote:
>>
>> Nico, all,
>>
>> I am still stuck with this. Upgraded the docker image to 1.4.2 and
>> the AMIDST library to 0.7.0
>>
>> Just noticed this issue which signals logging issues outside
>> transforms: https://issues.apache.org/jira/browse/FLINK-7990
>>
>> Could this be related? Although I don't see the relation to logback.
>>
>> Below is the library code invoked after "BEFORE updateModel"
>>
>> |try { Configuration config = new Configuration();
>> config.setString(BN_NAME, this.dag.getName());
>> config.setBytes(EFBN_NAME,
>> Serialization.serializeObject(efBayesianNetwork));
>> DataSet<DataInstance> dataset = dataUpdate.getDataSet(); this.sumSS =
>> dataset.map(new SufficientSatisticsMAP()) .withParameters(config)
>> .reduce(new SufficientSatisticsReduce()) .collect().get(0); //Add the
>> prior sumSS.sum(efBayesianNetwork.createInitSufficientStatistics());
>> JobExecutionResult result =
>> dataset.getExecutionEnvironment().getLastJobExecutionResult();
>> numInstances =
>> result.getAccumulatorResult(ParallelMaximumLikelihood.COUNTER_NAME+"_"+this.dag.getName());
>> numInstances++;//Initial counts }catch(Exception ex){ throw new
>> UndeclaredThrowableException(ex); } |
>> JP
>>
>> On 01/16/2018 10:50 AM, Nico Kruber wrote:
>>> Just a guess, but probably our logging initialisation changes the global
>>> log level (see conf/log4j.properties). DataStream.collect() executes the
>>> program along with creating a local Flink "cluster" (if you are testing
>>> locally / in an IDE) and initializing logging, among other things.
>>>
>>> Please comment the first line out and uncomment the following one to
>>> read like this:
>>> ==========
>>> # This affects logging for both user code and Flink
>>> #log4j.rootLogger=INFO, file
>>>
>>> # Uncomment this if you want to _only_ change Flink's logging
>>> log4j.logger.org.apache.flink=INFO
>>> ==========
>>>
>>>
>>> Nico
>>>
>>> On 13/01/18 13:52, jp@vooght.de wrote:
>>>> Hello,
>>>> I am learning Flink and using the docker image along with the AMIDST
>>>> library for this.
>>>> Below is a sample task from AMIDST which provides INFO output up until I
>>>> reach updateModel(). I pasted the short method as well and wonder what
>>>> prevents the Logger from
>>>>
>>>>         //Set-up Flink session
>>>>         env = ExecutionEnvironment.getExecutionEnvironment();
>>>>         env.getConfig().disableSysoutLogging();
>>>>         Logger LOG = LoggerFactory.getLogger(">>>>> ParallelMLExample");
>>>>
>>>>         //generate a random dataset
>>>>         DataFlink<DataInstance> dataFlink = new
>>>> DataSetGenerator().generate(env, 1234, 1000, 5, 0);
>>>>
>>>>         //Creates a DAG with the NaiveBayes structure for the random
>>>> dataset
>>>>         DAG dag =
>>>> DAGGenerator.getNaiveBayesStructure(dataFlink.getAttributes(),
>>>> "DiscreteVar4");
>>>>         LOG.info(dag.toString());
>>>>
>>>>         //Create the Learner object
>>>>         ParameterLearningAlgorithm learningAlgorithmFlink = new
>>>> ParallelMaximumLikelihood();
>>>>
>>>>         //Learning parameters
>>>>         learningAlgorithmFlink.setBatchSize(10);
>>>>         learningAlgorithmFlink.setDAG(dag);
>>>>
>>>>         //Initialize the learning process
>>>>         learningAlgorithmFlink.initLearning();
>>>>
>>>>         //Learn from the flink data
>>>>         LOG.info("BEFORE UPDATEMODEL");
>>>>         learningAlgorithmFlink.updateModel(dataFlink);
>>>>         LOG.info("AFTER UPDATEMODEL");
>>>>
>>>>         //Print the learnt Bayes Net
>>>>         BayesianNetwork bn =
>>>> learningAlgorithmFlink.getLearntBayesianNetwork();
>>>>         LOG.info(bn.toString());
>>>>
>>>>
>>>> Below is the updateModel method.
>>>>
>>>>     public double updateModel(DataFlink<DataInstance> dataUpdate) {
>>>>         try {
>>>>             Configuration config = new Configuration();
>>>>             config.setString(BN_NAME, this.dag.getName());
>>>>             config.setBytes(EFBN_NAME,
>>>> Serialization.serializeObject(efBayesianNetwork));
>>>>
>>>>             DataSet<DataInstance> dataset = dataUpdate.getDataSet();
>>>>             this.sumSS = dataset.map(new SufficientSatisticsMAP())
>>>>                     .withParameters(config)
>>>>                     .reduce(new SufficientSatisticsReduce())
>>>>                     .collect().get(0);
>>>>
>>>>             //Add the prior
>>>>             sumSS.sum(efBayesianNetwork.createInitSufficientStatistics());
>>>>
>>>>             JobExecutionResult result =
>>>> dataset.getExecutionEnvironment().getLastJobExecutionResult();
>>>>
>>>>             numInstances =
>>>> result.getAccumulatorResult(ParallelMaximumLikelihood.COUNTER_NAME+"_"+this.dag.getName());
>>>>
>>>>             numInstances++;//Initial counts
>>>>
>>>>         }catch(Exception ex){
>>>>             throw new UndeclaredThrowableException(ex);
>>>>         }
>>>>
>>>>         return this.getLogMarginalProbability();
>>>>     }
>>>>
>>>>
>>>> Not sure why LOG.info past that method are not output to the console.
>>>> TIA
>>>> JP
>>
>