You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Ana M. Martinez" <an...@cs.aau.dk> on 2015/12/17 13:56:09 UTC

Problem to show logs in task managers

Hi flink community,

I am trying to show log messages using log4j.
It works fine overall except for the messages I want to show in an inner class that implements org.apache.flink.api.common.aggregators.ConvergenceCriterion.
I am very new to this, but it seems that I’m having problems to show the messages included in the isConverged function, as it runs in the task managers? 
E.g. the log messages in the outer class (before map-reduce operations) are properly shown.

I am also interested in providing my own log4j.properties file. I am using the ./bin/flink run -m yarn-cluster on Amazon clusters. 

Thanks,
Ana

Re: Problem to show logs in task managers

Posted by Robert Metzger <rm...@apache.org>.
Maybe the isConverged() method is never called? For making that sure, just
throw a RuntimeException inside the method to see whats happening.

On Wed, Jan 6, 2016 at 3:58 PM, Ana M. Martinez <an...@cs.aau.dk> wrote:

> Hi Till,
>
> I am afraid it does not work in any case.
>
> I am following the steps you indicate on your websites (for yarn
> configuration and loggers with slf4j):
>
> 1) Enable log aggregation in yarn-site:
>
> https://ci.apache.org/projects/flink/flink-docs-release-0.7/yarn_setup.html#log-files
>
> 2) Include Loggers as indicated here (see WordCountExample below):
>
> https://ci.apache.org/projects/flink/flink-docs-release-0.7/internal_logging.html
>
> But I cannot get the log messages that run in the map functions. Am I
> missing something?
>
> Thanks,
> Ana
>
> On 04 Jan 2016, at 14:00, Till Rohrmann <tr...@apache.org> wrote:
>
> I think the YARN application has to be finished in order for the logs to
> be accessible.
>
> Judging from you commands, you’re starting a long running YARN application
> running Flink with ./bin/yarn-session.sh -n 1 -tm 2048 -s 4. This cluster
> won’t be used though, because you’re executing your job with ./bin/flink
> run -m yarn-cluster which will start another YARN application which is
> only alive as long as the Flink job is executed. If you want to run your
> job on the long running YARN application, then you simply have to omit -m
> yarn-cluster.
>
> Cheers,
> Till
> ​
>
> On Mon, Jan 4, 2016 at 12:36 PM, Ana M. Martinez <an...@cs.aau.dk> wrote:
>
>> Hi Till,
>>
>> Sorry for the delay (Xmas break). I have activated log aggregation
>> on flink-conf.yaml with yarn.log-aggregation-enable: true (as I can’t find
>> a yarn-site.xml).
>> But the command yarn logs -applicationId application_1451903796996_0008
>> gives me the following output:
>>
>> INFO client.RMProxy: Connecting to ResourceManager at xxx
>> /var/log/hadoop-yarn/apps/hadoop/logs/application_1451903796996_0008does
>> not exist.
>> Log aggregation has not completed or is not enabled
>>
>>
>> I’ve tried to restart the Flink JobManager and TaskManagers as follows:
>> ./bin/yarn-session.sh -n 1 -tm 2048 -s 4
>> and then with a detached screen, run my application with ./bin/flink run
>> -m yarn-cluster ...
>>
>> I am not sure if my problem is that I am not setting the
>> log-aggregation-enable property well or I am not restarting the Flink
>> JobManager and TaskManagers as I should… Any idea?
>>
>> Thanks,
>> Ana
>>
>> On 18 Dec 2015, at 16:29, Till Rohrmann <tr...@apache.org> wrote:
>>
>> In which log file are you exactly looking for the logging statements? And
>> on what machine? You have to look on the machines on which the yarn
>> container were started. Alternatively if you have log aggregation
>> activated, then you can simply retrieve the log files via yarn logs.
>>
>> Cheers,
>> Till
>>
>> On Fri, Dec 18, 2015 at 3:49 PM, Ana M. Martinez <an...@cs.aau.dk> wrote:
>>
>>> Hi Till,
>>>
>>> Many thanks for your quick response.
>>>
>>> I have modified the WordCountExample to re-reproduce my problem in a
>>> simple example.
>>>
>>> I run the code below with the following command:
>>> ./bin/flink run -m yarn-cluster -yn 1 -ys 4 -yjm 1024 -ytm 1024 -c
>>> mypackage.WordCountExample ../flinklink.jar
>>>
>>> And if I check the log file I see all logger messages except the one in
>>> the flatMap function of the inner LineSplitter class, which is actually the
>>> one I am most interested in.
>>>
>>> Is that an expected behaviour?
>>>
>>> Thanks,
>>> Ana
>>>
>>> import org.apache.flink.api.common.functions.FlatMapFunction;
>>> import org.apache.flink.api.java.DataSet;
>>> import org.apache.flink.api.java.ExecutionEnvironment;
>>> import org.apache.flink.api.java.tuple.Tuple2;
>>> import org.apache.flink.util.Collector;
>>> import org.slf4j.Logger;
>>> import org.slf4j.LoggerFactory;
>>>
>>> import java.io.Serializable;
>>> import java.util.ArrayList;
>>> import java.util.List;
>>>
>>> public class WordCountExample {
>>>     static Logger logger = LoggerFactory.getLogger(WordCountExample.class);
>>>
>>>     public static void main(String[] args) throws Exception {
>>>         final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
>>>
>>>         logger.info("Entering application.");
>>>
>>>     DataSet<String> text = env.fromElements(
>>>                 "Who's there?",
>>>                 "I think I hear them. Stand, ho! Who's there?");
>>>
>>>         List<Integer> elements = new ArrayList<Integer>();
>>>         elements.add(0);
>>>
>>>
>>>         DataSet<TestClass> set = env.fromElements(new TestClass(elements));
>>>
>>>         DataSet<Tuple2<String, Integer>> wordCounts = text
>>>                 .flatMap(new LineSplitter())
>>>                 .withBroadcastSet(set, "set")
>>>                 .groupBy(0)
>>>                 .sum(1);
>>>
>>>         wordCounts.print();
>>>
>>>
>>>     }
>>>
>>>     public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
>>>
>>>         static Logger loggerLineSplitter = LoggerFactory.getLogger(LineSplitter.class);
>>>
>>>         @Override
>>>         public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
>>>             loggerLineSplitter.info("Logger in LineSplitter.flatMap");
>>>             for (String word : line.split(" ")) {
>>>                 out.collect(new Tuple2<String, Integer>(word, 1));
>>>             }
>>>         }
>>>     }
>>>
>>>     public static class TestClass implements Serializable {
>>>         private static final long serialVersionUID = -2932037991574118651L;
>>>
>>>         static Logger loggerTestClass = LoggerFactory.getLogger("WordCountExample.TestClass");
>>>
>>>         List<Integer> integerList;
>>>         public TestClass(List<Integer> integerList){
>>>             this.integerList=integerList;
>>>             loggerTestClass.info("Logger in TestClass");
>>>         }
>>>
>>>
>>>     }
>>> }
>>>
>>>
>>>
>>>
>>> On 17 Dec 2015, at 16:08, Till Rohrmann <tr...@apache.org> wrote:
>>>
>>> Hi Ana,
>>>
>>> you can simply modify the `log4j.properties` file in the `conf`
>>> directory. It should be automatically included in the Yarn application.
>>>
>>> Concerning your logging problem, it might be that you have set the
>>> logging level too high. Could you share the code with us?
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, Dec 17, 2015 at 1:56 PM, Ana M. Martinez <an...@cs.aau.dk> wrote:
>>>
>>>> Hi flink community,
>>>>
>>>> I am trying to show log messages using log4j.
>>>> It works fine overall except for the messages I want to show in an
>>>> inner class that implements
>>>> org.apache.flink.api.common.aggregators.ConvergenceCriterion.
>>>> I am very new to this, but it seems that I’m having problems to show
>>>> the messages included in the isConverged function, as it runs in the task
>>>> managers?
>>>> E.g. the log messages in the outer class (before map-reduce operations)
>>>> are properly shown.
>>>>
>>>> I am also interested in providing my own log4j.properties file. I am
>>>> using the ./bin/flink run -m yarn-cluster on Amazon clusters.
>>>>
>>>> Thanks,
>>>> Ana
>>>
>>>
>>>
>>>
>>
>>
>
>

Re: Problem to show logs in task managers

Posted by "Ana M. Martinez" <an...@cs.aau.dk>.
Hi Till,

Sorry for the delay, you were right, I was not restarting the yarn cluster…

Many thanks for your help!
Ana

On 11 Jan 2016, at 14:39, Till Rohrmann <tr...@apache.org>> wrote:


You have to restart the yarn cluster to let your changes take effect. You can do that via HADOOP_HOME/sbin/stop-yarn.sh; HADOOP_HOME/sbin/start-yarn.sh.

The commands yarn-session.sh ... and bin/flink run -m yarn cluster start a new yarn application within the yarn cluster.

Cheers,
Till

​

On Mon, Jan 11, 2016 at 1:39 PM, Ana M. Martinez <an...@cs.aau.dk>> wrote:
Hi Till,

Thanks for your help. I have checked both in Yarn’s web interface and through command line and it seems that there are not occupied containers.

Additionally, I have checked the configuration values in the web interface and even though I have changed the log.aggregation property in the yarn-site.xml file to true, it appears as false and with the following source label:
<property>
<name>yarn.log-aggregation-enable</name>
<value>false</value>
<source>java.io.BufferedInputStream@3c407114</source>
</property>

I am not sure if that is relevant. I had assumed that the "./bin/flink run -m yarn-cluster" command is starting a yarn session and thus reloading the yarn-site file. Is that right? If I am wrong here, then, how can I restart it so that the modifications in the yarn-site.xml file are considered? (I have also tried with ./bin/yarn-session.sh and then ./bin/flink run without success…).

I am not sure if this is related to flink anymore, should I move my problem to the yarn community instead?

Thanks,
Ana

On 11 Jan 2016, at 10:37, Till Rohrmann <tr...@apache.org>> wrote:


Hi Ana,

good to hear that you found the logging statements. You can check in Yarn’s web interface whether there are still occupied containers. Alternatively you can go to the different machines and run jps which lists you the running Java processes. If you see an ApplicationMaster or YarnTaskManagerRunner process, then there is still a container running with Flink on this machine. I hope this helps you.

Cheers,
Till

​

On Mon, Jan 11, 2016 at 9:37 AM, Ana M. Martinez <an...@cs.aau.dk>> wrote:
Hi Till,

Thanks for that! I can see the "Logger in LineSplitter.flatMap” output if I retrieve the task manager logs manually (under /var/log/hadoop-yarn/containers/application_X/…). However that solution is not ideal when for instance I am using 32 machines for my mapReduce operations.

I would like to know why Yarn’s log aggregation is not working. Can you tell me how to check if there are some Yarn containers running after the Flink job has finished? I have tried:
hadoop job -list
but I cannot see any jobs there, although I am not sure that it means that there are not containers running...

Thanks,
Ana

On 08 Jan 2016, at 16:24, Till Rohrmann <tr...@apache.org>> wrote:


You’re right that the log statements of the LineSplitter are in the logs of the cluster nodes, because that’s where the LineSplitter code is executed. In contrast, you create a TestClass on the client when you submit the program. Therefore, you see the logging statement “Logger in TestClass” on the command line or in the cli log file.

So I would assume that the problem is Yarn’s log aggregation. Either your configuration is not correct or there are still some Yarn containers running after the Flink job has finished. Yarn will only show you the logs after all containers are terminated. Maybe you could check that. Alternatively, you can try to retrieve the taskmanager logs manually by going to the machine where your yarn container was executed. Then under hadoop/logs/userlogs you should find somewhere the logs.

Cheers,
Till

​

On Fri, Jan 8, 2016 at 3:50 PM, Ana M. Martinez <an...@cs.aau.dk>> wrote:
Thanks for the tip Robert! It was a good idea to rule out other possible causes, but I am afraid that is not the problem. If we stick to the WordCountExample (for simplicity), the Exception is thrown if placed into the flatMap function.

I am going to try to re-write my problem and all the settings below:

When I try to aggregate all logs:
 $yarn logs -applicationId application_1452250761414_0005

the following message is retrieved:
16/01/08 13:32:37 INFO client.RMProxy: Connecting to ResourceManager at ip-172-31-33-221.us<http://ip-172-31-33-221.us/>-west-2.compute.internal/172.31.33.221:8032<http://172.31.33.221:8032/>
/var/log/hadoop-yarn/apps/hadoop/logs/application_1452250761414_0005does not exist.
Log aggregation has not completed or is not enabled.

(Tried the same command a few minutes later and got the same message, so might it be that log aggregation is not properly enabled??)

I am going to carefully enumerate all the steps I have followed (and settings) to see if someone can identify why the Logger messages from CORE nodes (in an Amazon cluster) are not shown.

1) Enable yarn.log-aggregation-enable property to true in /etc/alternatives/hadoop-conf/yarn-site.xml.

2) Include log messages in my WordCountExample as follows:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;


public class WordCountExample {
    static Logger logger = LoggerFactory.getLogger(WordCountExample.class);

    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        logger.info("Entering application.");

        DataSet<String> text = env.fromElements(
                "Who's there?",
                "I think I hear them. Stand, ho! Who's there?");

        List<Integer> elements = new ArrayList<Integer>();
        elements.add(0);


        DataSet<TestClass> set = env.fromElements(new TestClass(elements));

        DataSet<Tuple2<String, Integer>> wordCounts = text
                .flatMap(new LineSplitter())
                .withBroadcastSet(set, "set")
                .groupBy(0)
                .sum(1);

        wordCounts.writeAsText(“output.txt", FileSystem.WriteMode.OVERWRITE);


    }

    public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {

        static Logger loggerLineSplitter = LoggerFactory.getLogger(LineSplitter.class);

        @Override
        public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
            loggerLineSplitter.info("Logger in LineSplitter.flatMap");

                for (String word : line.split(" ")) {
                        out.collect(new Tuple2<String, Integer>(word, 1));
                        //throw new RuntimeException("LineSplitter class called");
                }

        }
    }

    public static class TestClass implements Serializable {
        private static final long serialVersionUID = -2932037991574118651L;

        static Logger loggerTestClass = LoggerFactory.getLogger("TestClass.class");

        List<Integer> integerList;
        public TestClass(List<Integer> integerList){
            this.integerList=integerList;
            loggerTestClass.info("Logger in TestClass");
        }


    }
}

3) Start a yarn-cluster and execute my program with the following command:

$./bin/flink run -m yarn-cluster -yn 1 -ys 4 -yjm 1024 -ytm 1024 -c eu.amidst.flinklink.examples.WordCountExample ../flinklink.jar


4) The output in the log folder is as follows:

13:31:04,945 INFO  org.apache.flink.client.CliFrontend                           - --------------------------------------------------------------------------------
13:31:04,947 INFO  org.apache.flink.client.CliFrontend                           -  Starting Command Line Client (Version: 0.10.0, Rev:ab2cca4, Date:10.11.2015 @ 13:50:14 UTC)
13:31:04,947 INFO  org.apache.flink.client.CliFrontend                           -  Current user: hadoop
13:31:04,947 INFO  org.apache.flink.client.CliFrontend                           -  JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.65-b01
13:31:04,947 INFO  org.apache.flink.client.CliFrontend                           -  Maximum heap size: 3344 MiBytes
13:31:04,947 INFO  org.apache.flink.client.CliFrontend                           -  JAVA_HOME: /etc/alternatives/jre
13:31:04,950 INFO  org.apache.flink.client.CliFrontend                           -  Hadoop version: 2.6.0
13:31:04,950 INFO  org.apache.flink.client.CliFrontend                           -  JVM Options:
13:31:04,950 INFO  org.apache.flink.client.CliFrontend                           -     -Dlog.file=/home/hadoop/flink-0.10.0/log/flink-hadoop-client-ip-172-31-33-221.log
13:31:04,950 INFO  org.apache.flink.client.CliFrontend                           -     -Dlog4j.configuration=file:/home/hadoop/flink-0.10.0/conf/log4j-cli.properties
13:31:04,950 INFO  org.apache.flink.client.CliFrontend                           -     -Dlogback.configurationFile=file:/home/hadoop/flink-0.10.0/conf/logback.xml
13:31:04,951 INFO  org.apache.flink.client.CliFrontend                           -  Program Arguments:
13:31:04,951 INFO  org.apache.flink.client.CliFrontend                           -     run
13:31:04,951 INFO  org.apache.flink.client.CliFrontend                           -     -m
13:31:04,951 INFO  org.apache.flink.client.CliFrontend                           -     yarn-cluster
13:31:04,951 INFO  org.apache.flink.client.CliFrontend                           -     -yn
13:31:04,951 INFO  org.apache.flink.client.CliFrontend                           -     1
13:31:04,951 INFO  org.apache.flink.client.CliFrontend                           -     -ys
13:31:04,951 INFO  org.apache.flink.client.CliFrontend                           -     4
13:31:04,951 INFO  org.apache.flink.client.CliFrontend                           -     -yjm
13:31:04,951 INFO  org.apache.flink.client.CliFrontend                           -     1024
13:31:04,951 INFO  org.apache.flink.client.CliFrontend                           -     -ytm
13:31:04,951 INFO  org.apache.flink.client.CliFrontend                           -     1024
13:31:04,952 INFO  org.apache.flink.client.CliFrontend                           -     -c
13:31:04,952 INFO  org.apache.flink.client.CliFrontend                           -     eu.amidst.flinklink.examples.WordCountExample
13:31:04,952 INFO  org.apache.flink.client.CliFrontend                           -     ../flinklink.jar
13:31:04,952 INFO  org.apache.flink.client.CliFrontend                           - --------------------------------------------------------------------------------
13:31:04,954 INFO  org.apache.flink.client.CliFrontend                           - Using configuration directory /home/hadoop/flink-0.10.0/conf
13:31:04,954 INFO  org.apache.flink.client.CliFrontend                           - Trying to load configuration file
13:31:05,193 INFO  org.apache.flink.client.CliFrontend                           - Running 'run' command.
13:31:05,201 INFO  org.apache.flink.client.CliFrontend                           - Building program from JAR file
13:31:05,326 INFO  org.apache.flink.client.CliFrontend                           - YARN cluster mode detected. Switching Log4j output to console
13:31:05,385 INFO  org.apache.hadoop.yarn.client.RMProxy                         - Connecting to ResourceManager at ip-172-31-33-221.us<http://ip-172-31-33-221.us/>-west-2.compute.internal/172.31.33.221:8032<http://172.31.33.221:8032/>
13:31:05,534 INFO  org.apache.flink.client.FlinkYarnSessionCli                   - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.FlinkYarnClient to locate the jar
13:31:05,545 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Using values:
13:31:05,547 INFO  org.apache.flink.yarn.FlinkYarnClient                         - TaskManager count = 1
13:31:05,547 INFO  org.apache.flink.yarn.FlinkYarnClient                         - JobManager memory = 1024
13:31:05,547 INFO  org.apache.flink.yarn.FlinkYarnClient                         - TaskManager memory = 1024
13:31:06,112 INFO  org.apache.flink.yarn.Utils                                   - Copying from file:/home/hadoop/flink-0.10.0/lib/flink-dist-0.10.0.jar to hdfs://ip-172-31-33-221.us-west-2.compute.internal:8020/user/hadoop/.flink/application_1452250761414_0005/flink-dist-0.10.0.jar
13:31:06,843 INFO  org.apache.flink.yarn.Utils                                   - Copying from /home/hadoop/flink-0.10.0/conf/flink-conf.yaml to hdfs://ip-172-31-33-221.us-west-2.compute.internal:8020/user/hadoop/.flink/application_1452250761414_0005/flink-conf.yaml
13:31:06,857 INFO  org.apache.flink.yarn.Utils                                   - Copying from file:/home/hadoop/flink-0.10.0/conf/logback.xml to hdfs://ip-172-31-33-221.us-west-2.compute.internal:8020/user/hadoop/.flink/application_1452250761414_0005/logback.xml
13:31:06,869 INFO  org.apache.flink.yarn.Utils                                   - Copying from file:/home/hadoop/flink-0.10.0/conf/log4j.properties to hdfs://ip-172-31-33-221.us-west-2.compute.internal:8020/user/hadoop/.flink/application_1452250761414_0005/log4j.properties
13:31:06,892 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Submitting application master application_1452250761414_0005
13:31:06,917 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl         - Submitted application application_1452250761414_0005
13:31:06,917 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Waiting for the cluster to be allocated
13:31:06,919 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
13:31:07,920 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
13:31:08,922 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
13:31:09,924 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
13:31:10,925 INFO  org.apache.flink.yarn.FlinkYarnClient                         - YARN application has been deployed successfully.
13:31:10,929 INFO  org.apache.flink.yarn.FlinkYarnCluster                        - Start actor system.
13:31:11,412 INFO  akka.event.slf4j.Slf4jLogger                                  - Slf4jLogger started
13:31:11,472 INFO  Remoting                                                      - Starting remoting
13:31:11,698 INFO  Remoting                                                      - Remoting started; listening on addresses :[akka.tcp://flink@172.31.33.221:39464]
13:31:11,733 INFO  org.apache.flink.yarn.FlinkYarnCluster                        - Start application client.
13:31:11,737 INFO  org.apache.flink.client.CliFrontend                           - YARN cluster started
13:31:11,737 INFO  org.apache.flink.client.CliFrontend                           - JobManager web interface address http://ip-172-31-33-221.us-west-2.compute.internal:20888/proxy/application_1452250761414_0005/
13:31:11,737 INFO  org.apache.flink.client.CliFrontend                           - Waiting until all TaskManagers have connected
13:31:11,748 INFO  org.apache.flink.yarn.ApplicationClient                       - Notification about new leader address akka.tcp://flink@172.31.45.98:46965/user/jobmanager with session ID null.
13:31:11,752 INFO  org.apache.flink.client.CliFrontend                           - No status updates from the YARN cluster received so far. Waiting ...
13:31:11,752 INFO  org.apache.flink.yarn.ApplicationClient                       - Received address of new leader akka.tcp://flink@172.31.45.98:46965/user/jobmanager with session ID null.
13:31:11,753 INFO  org.apache.flink.yarn.ApplicationClient                       - Disconnect from JobManager null.
13:31:11,757 INFO  org.apache.flink.yarn.ApplicationClient                       - Trying to register at JobManager akka.tcp://flink@172.31.45.98:46965/user/jobmanager.
13:31:12,040 INFO  org.apache.flink.yarn.ApplicationClient                       - Successfully registered at the JobManager Actor[akka.tcp://flink@172.31.45.98:46965/user/jobmanager#1549383782]
13:31:12,253 INFO  org.apache.flink.client.CliFrontend                           - TaskManager status (0/1)
13:31:12,753 INFO  org.apache.flink.client.CliFrontend                           - TaskManager status (0/1)
13:31:13,254 INFO  org.apache.flink.client.CliFrontend                           - TaskManager status (0/1)
13:31:13,755 INFO  org.apache.flink.client.CliFrontend                           - TaskManager status (0/1)
13:31:14,255 INFO  org.apache.flink.client.CliFrontend                           - TaskManager status (0/1)
13:31:14,756 INFO  org.apache.flink.client.CliFrontend                           - TaskManager status (0/1)
13:31:15,257 INFO  org.apache.flink.client.CliFrontend                           - TaskManager status (0/1)
13:31:15,758 INFO  org.apache.flink.client.CliFrontend                           - TaskManager status (0/1)
13:31:16,258 INFO  org.apache.flink.client.CliFrontend                           - All TaskManagers are connected
13:31:16,264 INFO  org.apache.flink.client.program.Client                        - Starting client actor system
13:31:16,265 INFO  org.apache.flink.runtime.client.JobClient                     - Starting JobClient actor system
13:31:16,283 INFO  akka.event.slf4j.Slf4jLogger                                  - Slf4jLogger started
13:31:16,288 INFO  Remoting                                                      - Starting remoting
13:31:16,301 INFO  Remoting                                                      - Remoting started; listening on addresses :[akka.tcp://flink@127.0.0.1:45919]
13:31:16,302 INFO  org.apache.flink.runtime.client.JobClient                     - Started JobClient actor system at 127.0.0.1:45919<http://127.0.0.1:45919/>
13:31:16,302 INFO  org.apache.flink.client.CliFrontend                           - Using the parallelism provided by the remote cluster (4). To use another parallelism, set it at the ./bin/flink client.
13:31:16,302 INFO  org.apache.flink.client.CliFrontend                           - Starting execution of program
13:31:16,303 INFO  org.apache.flink.client.program.Client                        - Starting program in interactive mode
13:31:16,313 INFO  eu.amidst.flinklink.examples.WordCountExample                 - Entering application.
13:31:16,342 INFO  TestClass.class                                               - Logger in TestClass
13:31:16,346 INFO  org.apache.flink.api.java.typeutils.TypeExtractor             - class eu.amidst.flinklink.examples.WordCountExample$TestClass is not a valid POJO type
13:31:16,376 INFO  org.apache.flink.client.CliFrontend                           - Program execution finished
13:31:16,384 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Shutting down remote daemon.
13:31:16,386 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remote daemon shut down; proceeding with flushing remote transports.
13:31:16,408 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remoting shut down.
13:31:16,431 INFO  org.apache.flink.client.CliFrontend                           - Shutting down YARN cluster
13:31:16,431 INFO  org.apache.flink.yarn.FlinkYarnCluster                        - Sending shutdown request to the Application Master
13:31:16,432 INFO  org.apache.flink.yarn.ApplicationClient                       - Sending StopYarnSession request to ApplicationMaster.
13:31:16,568 INFO  org.apache.flink.yarn.ApplicationClient                       - Remote JobManager has been stopped successfully. Stopping local application client
13:31:16,570 INFO  org.apache.flink.yarn.ApplicationClient                       - Stopped Application client.
13:31:16,570 INFO  org.apache.flink.yarn.ApplicationClient                       - Disconnect from JobManager Actor[akka.tcp://flink@172.31.45.98:46965/user/jobmanager#1549383782].
13:31:16,573 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Shutting down remote daemon.
13:31:16,573 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remote daemon shut down; proceeding with flushing remote transports.
13:31:16,584 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remoting shut down.
13:31:16,595 INFO  org.apache.flink.yarn.FlinkYarnCluster                        - Deleting files in hdfs://ip-172-31-33-221.us-west-2.compute.internal:8020/user/hadoop/.flink/application_1452250761414_0005
13:31:16,596 INFO  org.apache.flink.yarn.FlinkYarnCluster                        - Application application_1452250761414_0005 finished with state FINISHED and final state SUCCEEDED at 1452259876445
13:31:16,747 INFO  org.apache.flink.yarn.FlinkYarnCluster                        - YARN Client is shutting down


You can see the log messages from the WordCountExample and TestClass classes. But I have problems to show the logger message (INFO) in the LineSplitter class. Presumably, because it is executed in the CORE nodes and node in the MASTER node (it all runs well in my local computer).

Any tips?
Ana


On 06 Jan 2016, at 15:58, Ana M. Martinez <an...@cs.aau.dk>> wrote:

Hi Till,

I am afraid it does not work in any case.

I am following the steps you indicate on your websites (for yarn configuration and loggers with slf4j):

1) Enable log aggregation in yarn-site:
https://ci.apache.org/projects/flink/flink-docs-release-0.7/yarn_setup.html#log-files

2) Include Loggers as indicated here (see WordCountExample below):
https://ci.apache.org/projects/flink/flink-docs-release-0.7/internal_logging.html

But I cannot get the log messages that run in the map functions. Am I missing something?

Thanks,
Ana

On 04 Jan 2016, at 14:00, Till Rohrmann <tr...@apache.org>> wrote:


I think the YARN application has to be finished in order for the logs to be accessible.

Judging from you commands, you’re starting a long running YARN application running Flink with ./bin/yarn-session.sh -n 1 -tm 2048 -s 4. This cluster won’t be used though, because you’re executing your job with ./bin/flink run -m yarn-cluster which will start another YARN application which is only alive as long as the Flink job is executed. If you want to run your job on the long running YARN application, then you simply have to omit -m yarn-cluster.

Cheers,
Till

​

On Mon, Jan 4, 2016 at 12:36 PM, Ana M. Martinez <an...@cs.aau.dk>> wrote:
Hi Till,

Sorry for the delay (Xmas break). I have activated log aggregation on flink-conf.yaml with yarn.log-aggregation-enable: true (as I can’t find a yarn-site.xml).
But the command yarn logs -applicationId application_1451903796996_0008 gives me the following output:

INFO client.RMProxy: Connecting to ResourceManager at xxx
/var/log/hadoop-yarn/apps/hadoop/logs/application_1451903796996_0008does not exist.
Log aggregation has not completed or is not enabled

I’ve tried to restart the Flink JobManager and TaskManagers as follows:
./bin/yarn-session.sh -n 1 -tm 2048 -s 4
and then with a detached screen, run my application with ./bin/flink run -m yarn-cluster ...

I am not sure if my problem is that I am not setting the log-aggregation-enable property well or I am not restarting the Flink JobManager and TaskManagers as I should… Any idea?

Thanks,
Ana

On 18 Dec 2015, at 16:29, Till Rohrmann <tr...@apache.org>> wrote:

In which log file are you exactly looking for the logging statements? And on what machine? You have to look on the machines on which the yarn container were started. Alternatively if you have log aggregation activated, then you can simply retrieve the log files via yarn logs.

Cheers,
Till

On Fri, Dec 18, 2015 at 3:49 PM, Ana M. Martinez <an...@cs.aau.dk>> wrote:
Hi Till,

Many thanks for your quick response.

I have modified the WordCountExample to re-reproduce my problem in a simple example.

I run the code below with the following command:
./bin/flink run -m yarn-cluster -yn 1 -ys 4 -yjm 1024 -ytm 1024 -c mypackage.WordCountExample ../flinklink.jar

And if I check the log file I see all logger messages except the one in the flatMap function of the inner LineSplitter class, which is actually the one I am most interested in.

Is that an expected behaviour?

Thanks,
Ana


import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

public class WordCountExample {
    static Logger logger = LoggerFactory.getLogger(WordCountExample.class);

    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        logger.info("Entering application.");

    DataSet<String> text = env.fromElements(
                "Who's there?",
                "I think I hear them. Stand, ho! Who's there?");

        List<Integer> elements = new ArrayList<Integer>();
        elements.add(0);


        DataSet<TestClass> set = env.fromElements(new TestClass(elements));

        DataSet<Tuple2<String, Integer>> wordCounts = text
                .flatMap(new LineSplitter())
                .withBroadcastSet(set, "set")
                .groupBy(0)
                .sum(1);

        wordCounts.print();


    }

    public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {

        static Logger loggerLineSplitter = LoggerFactory.getLogger(LineSplitter.class);

        @Override
        public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
            loggerLineSplitter.info("Logger in LineSplitter.flatMap");
            for (String word : line.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }

    public static class TestClass implements Serializable {
        private static final long serialVersionUID = -2932037991574118651L;

        static Logger loggerTestClass = LoggerFactory.getLogger("WordCountExample.TestClass");

        List<Integer> integerList;
        public TestClass(List<Integer> integerList){
            this.integerList=integerList;
            loggerTestClass.info("Logger in TestClass");
        }


    }
}



On 17 Dec 2015, at 16:08, Till Rohrmann <tr...@apache.org>> wrote:

Hi Ana,

you can simply modify the `log4j.properties` file in the `conf` directory. It should be automatically included in the Yarn application.

Concerning your logging problem, it might be that you have set the logging level too high. Could you share the code with us?

Cheers,
Till

On Thu, Dec 17, 2015 at 1:56 PM, Ana M. Martinez <an...@cs.aau.dk>> wrote:
Hi flink community,

I am trying to show log messages using log4j.
It works fine overall except for the messages I want to show in an inner class that implements org.apache.flink.api.common.aggregators.ConvergenceCriterion.
I am very new to this, but it seems that I’m having problems to show the messages included in the isConverged function, as it runs in the task managers?
E.g. the log messages in the outer class (before map-reduce operations) are properly shown.

I am also interested in providing my own log4j.properties file. I am using the ./bin/flink run -m yarn-cluster on Amazon clusters.

Thanks,
Ana














Re: Problem to show logs in task managers

Posted by Till Rohrmann <tr...@apache.org>.
You have to restart the yarn cluster to let your changes take effect. You
can do that via HADOOP_HOME/sbin/stop-yarn.sh;
HADOOP_HOME/sbin/start-yarn.sh.

The commands yarn-session.sh ... and bin/flink run -m yarn cluster start a
new yarn application within the yarn cluster.

Cheers,
Till
​

On Mon, Jan 11, 2016 at 1:39 PM, Ana M. Martinez <an...@cs.aau.dk> wrote:

> Hi Till,
>
> Thanks for your help. I have checked both in Yarn’s web interface and
> through command line and it seems that there are not occupied containers.
>
> Additionally, I have checked the configuration values in the web interface
> and even though I have changed the log.aggregation property in the
> yarn-site.xml file to true, it appears as false and with the following
> source label:
> <property>
> <name>yarn.log-aggregation-enable</name>
> <value>false</value>
> <source>java.io.BufferedInputStream@3c407114</source>
> </property>
>
> I am not sure if that is relevant. I had assumed that the "./bin/flink run
> -m yarn-cluster" command is starting a yarn session and thus reloading the
> yarn-site file. Is that right? If I am wrong here, then, how can I restart
> it so that the modifications in the yarn-site.xml file are considered? (I
> have also tried with ./bin/yarn-session.sh and then ./bin/flink run without
> success…).
>
> I am not sure if this is related to flink anymore, should I move my
> problem to the yarn community instead?
>
> Thanks,
> Ana
>
> On 11 Jan 2016, at 10:37, Till Rohrmann <tr...@apache.org> wrote:
>
> Hi Ana,
>
> good to hear that you found the logging statements. You can check in
> Yarn’s web interface whether there are still occupied containers.
> Alternatively you can go to the different machines and run jps which
> lists you the running Java processes. If you see an ApplicationMaster or
> YarnTaskManagerRunner process, then there is still a container running
> with Flink on this machine. I hope this helps you.
>
> Cheers,
> Till
> ​
>
> On Mon, Jan 11, 2016 at 9:37 AM, Ana M. Martinez <an...@cs.aau.dk> wrote:
>
>> Hi Till,
>>
>> Thanks for that! I can see the "Logger in LineSplitter.flatMap” output if
>> I retrieve the task manager logs manually
>> (under /var/log/hadoop-yarn/containers/application_X/…). However that
>> solution is not ideal when for instance I am using 32 machines for my
>> mapReduce operations.
>>
>> I would like to know why Yarn’s log aggregation is not working. Can you
>> tell me how to check if there are some Yarn containers running after the
>> Flink job has finished? I have tried:
>> hadoop job -list
>> but I cannot see any jobs there, although I am not sure that it means
>> that there are not containers running...
>>
>> Thanks,
>> Ana
>>
>> On 08 Jan 2016, at 16:24, Till Rohrmann <tr...@apache.org> wrote:
>>
>> You’re right that the log statements of the LineSplitter are in the logs
>> of the cluster nodes, because that’s where the LineSplitter code is
>> executed. In contrast, you create a TestClass on the client when you
>> submit the program. Therefore, you see the logging statement “Logger in
>> TestClass” on the command line or in the cli log file.
>>
>> So I would assume that the problem is Yarn’s log aggregation. Either your
>> configuration is not correct or there are still some Yarn containers
>> running after the Flink job has finished. Yarn will only show you the logs
>> after all containers are terminated. Maybe you could check that.
>> Alternatively, you can try to retrieve the taskmanager logs manually by
>> going to the machine where your yarn container was executed. Then under
>> hadoop/logs/userlogs you should find somewhere the logs.
>>
>> Cheers,
>> Till
>> ​
>>
>> On Fri, Jan 8, 2016 at 3:50 PM, Ana M. Martinez <an...@cs.aau.dk> wrote:
>>
>>> Thanks for the tip Robert! It was a good idea to rule out other possible
>>> causes, but I am afraid that is not the problem. If we stick to the
>>> WordCountExample (for simplicity), the Exception is thrown if placed into
>>> the flatMap function.
>>>
>>> I am going to try to re-write my problem and all the settings below:
>>>
>>> When I try to aggregate all logs:
>>>  $yarn logs -applicationId application_1452250761414_0005
>>>
>>> the following message is retrieved:
>>> 16/01/08 13:32:37 INFO client.RMProxy: Connecting to ResourceManager at
>>> ip-172-31-33-221.us-west-2.compute.internal/172.31.33.221:8032
>>> /var/log/hadoop-yarn/apps/hadoop/logs/application_1452250761414_0005does
>>> not exist.
>>> Log aggregation has not completed or is not enabled.
>>>
>>> (Tried the same command a few minutes later and got the same message, so
>>> might it be that log aggregation is not properly enabled??)
>>>
>>> I am going to carefully enumerate all the steps I have followed (and
>>> settings) to see if someone can identify why the Logger messages from CORE
>>> nodes (in an Amazon cluster) are not shown.
>>>
>>> 1) Enable yarn.log-aggregation-enable property to true
>>> in /etc/alternatives/hadoop-conf/yarn-site.xml.
>>>
>>> 2) Include log messages in my WordCountExample as follows:
>>>
>>> import org.apache.flink.api.common.functions.FlatMapFunction;
>>> import org.apache.flink.api.java.DataSet;
>>> import org.apache.flink.api.java.ExecutionEnvironment;
>>> import org.apache.flink.api.java.tuple.Tuple2;
>>> import org.apache.flink.core.fs.FileSystem;
>>> import org.apache.flink.util.Collector;
>>> import org.slf4j.Logger;
>>> import org.slf4j.LoggerFactory;
>>>
>>> import java.io.Serializable;
>>> import java.util.ArrayList;
>>> import java.util.List;
>>>
>>>
>>> public class WordCountExample {
>>>     static Logger logger = LoggerFactory.getLogger(WordCountExample.class);
>>>
>>>     public static void main(String[] args) throws Exception {
>>>         final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
>>>
>>>         logger.info("Entering application.");
>>>
>>>         DataSet<String> text = env.fromElements(
>>>                 "Who's there?",
>>>                 "I think I hear them. Stand, ho! Who's there?");
>>>
>>>         List<Integer> elements = new ArrayList<Integer>();
>>>         elements.add(0);
>>>
>>>
>>>         DataSet<TestClass> set = env.fromElements(new TestClass(elements));
>>>
>>>         DataSet<Tuple2<String, Integer>> wordCounts = text
>>>                 .flatMap(new LineSplitter())
>>>                 .withBroadcastSet(set, "set")
>>>                 .groupBy(0)
>>>                 .sum(1);
>>>
>>>         wordCounts.writeAsText(*“*output.txt", FileSystem.WriteMode.OVERWRITE);
>>>
>>>
>>>     }
>>>
>>>     public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
>>>
>>>         static Logger loggerLineSplitter = LoggerFactory.getLogger(LineSplitter.class);
>>>
>>>         @Override
>>>         public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
>>>             loggerLineSplitter.info("Logger in LineSplitter.flatMap");
>>>
>>> 		for (String word : line.split(" ")) {
>>>     			out.collect(new Tuple2<String, Integer>(word, 1));
>>>     			//throw new RuntimeException("LineSplitter class called");
>>> 		}
>>>
>>>         }
>>>     }
>>>
>>>     public static class TestClass implements Serializable {
>>>         private static final long serialVersionUID = -2932037991574118651L;
>>>
>>>         static Logger loggerTestClass = LoggerFactory.getLogger("TestClass.class");
>>>
>>>         List<Integer> integerList;
>>>         public TestClass(List<Integer> integerList){
>>>             this.integerList=integerList;
>>>             loggerTestClass.info("Logger in TestClass");
>>>         }
>>>
>>>
>>>     }
>>> }
>>>
>>> 3) Start a yarn-cluster and execute my program with the following
>>> command:
>>>
>>> $./bin/flink run -m yarn-cluster -yn 1 -ys 4 -yjm 1024 -ytm 1024 -c
>>> eu.amidst.flinklink.examples.WordCountExample ../flinklink.jar
>>>
>>>
>>> 4) The output in the log folder is as follows:
>>>
>>> 13:31:04,945 INFO  org.apache.flink.client.CliFrontend
>>>         -
>>> --------------------------------------------------------------------------------
>>> 13:31:04,947 INFO  org.apache.flink.client.CliFrontend
>>>         -  Starting Command Line Client (Version: 0.10.0, Rev:ab2cca4,
>>> Date:10.11.2015 @ 13:50:14 UTC)
>>> 13:31:04,947 INFO  org.apache.flink.client.CliFrontend
>>>         -  Current user: hadoop
>>> 13:31:04,947 INFO  org.apache.flink.client.CliFrontend
>>>         -  JVM: OpenJDK 64-Bit Server VM - Oracle Corporation -
>>> 1.8/25.65-b01
>>> 13:31:04,947 INFO  org.apache.flink.client.CliFrontend
>>>         -  Maximum heap size: 3344 MiBytes
>>> 13:31:04,947 INFO  org.apache.flink.client.CliFrontend
>>>         -  JAVA_HOME: /etc/alternatives/jre
>>> 13:31:04,950 INFO  org.apache.flink.client.CliFrontend
>>>         -  Hadoop version: 2.6.0
>>> 13:31:04,950 INFO  org.apache.flink.client.CliFrontend
>>>         -  JVM Options:
>>> 13:31:04,950 INFO  org.apache.flink.client.CliFrontend
>>>         -
>>> -Dlog.file=/home/hadoop/flink-0.10.0/log/flink-hadoop-client-ip-172-31-33-221.log
>>> 13:31:04,950 INFO  org.apache.flink.client.CliFrontend
>>>         -
>>> -Dlog4j.configuration=file:/home/hadoop/flink-0.10.0/conf/log4j-cli.properties
>>> 13:31:04,950 INFO  org.apache.flink.client.CliFrontend
>>>         -
>>> -Dlogback.configurationFile=file:/home/hadoop/flink-0.10.0/conf/logback.xml
>>> 13:31:04,951 INFO  org.apache.flink.client.CliFrontend
>>>         -  Program Arguments:
>>> 13:31:04,951 INFO  org.apache.flink.client.CliFrontend
>>>         -     run
>>> 13:31:04,951 INFO  org.apache.flink.client.CliFrontend
>>>         -     -m
>>> 13:31:04,951 INFO  org.apache.flink.client.CliFrontend
>>>         -     yarn-cluster
>>> 13:31:04,951 INFO  org.apache.flink.client.CliFrontend
>>>         -     -yn
>>> 13:31:04,951 INFO  org.apache.flink.client.CliFrontend
>>>         -     1
>>> 13:31:04,951 INFO  org.apache.flink.client.CliFrontend
>>>         -     -ys
>>> 13:31:04,951 INFO  org.apache.flink.client.CliFrontend
>>>         -     4
>>> 13:31:04,951 INFO  org.apache.flink.client.CliFrontend
>>>         -     -yjm
>>> 13:31:04,951 INFO  org.apache.flink.client.CliFrontend
>>>         -     1024
>>> 13:31:04,951 INFO  org.apache.flink.client.CliFrontend
>>>         -     -ytm
>>> 13:31:04,951 INFO  org.apache.flink.client.CliFrontend
>>>         -     1024
>>> 13:31:04,952 INFO  org.apache.flink.client.CliFrontend
>>>         -     -c
>>> 13:31:04,952 INFO  org.apache.flink.client.CliFrontend
>>>         -     eu.amidst.flinklink.examples.WordCountExample
>>> 13:31:04,952 INFO  org.apache.flink.client.CliFrontend
>>>         -     ../flinklink.jar
>>> 13:31:04,952 INFO  org.apache.flink.client.CliFrontend
>>>         -
>>> --------------------------------------------------------------------------------
>>> 13:31:04,954 INFO  org.apache.flink.client.CliFrontend
>>>         - Using configuration directory /home/hadoop/flink-0.10.0/conf
>>> 13:31:04,954 INFO  org.apache.flink.client.CliFrontend
>>>         - Trying to load configuration file
>>> 13:31:05,193 INFO  org.apache.flink.client.CliFrontend
>>>         - Running 'run' command.
>>> 13:31:05,201 INFO  org.apache.flink.client.CliFrontend
>>>         - Building program from JAR file
>>> 13:31:05,326 INFO  org.apache.flink.client.CliFrontend
>>>         - YARN cluster mode detected. Switching Log4j output to console
>>> 13:31:05,385 INFO  org.apache.hadoop.yarn.client.RMProxy
>>>         - Connecting to ResourceManager at ip-172-31-33-221.us
>>> -west-2.compute.internal/172.31.33.221:8032
>>> 13:31:05,534 INFO  org.apache.flink.client.FlinkYarnSessionCli
>>>         - No path for the flink jar passed. Using the location of class
>>> org.apache.flink.yarn.FlinkYarnClient to locate the jar
>>> 13:31:05,545 INFO  org.apache.flink.yarn.FlinkYarnClient
>>>         - Using values:
>>> 13:31:05,547 INFO  org.apache.flink.yarn.FlinkYarnClient
>>>         - TaskManager count = 1
>>> 13:31:05,547 INFO  org.apache.flink.yarn.FlinkYarnClient
>>>         - JobManager memory = 1024
>>> 13:31:05,547 INFO  org.apache.flink.yarn.FlinkYarnClient
>>>         - TaskManager memory = 1024
>>> 13:31:06,112 INFO  org.apache.flink.yarn.Utils
>>>         - Copying from
>>> file:/home/hadoop/flink-0.10.0/lib/flink-dist-0.10.0.jar to
>>> hdfs://ip-172-31-33-221.us-west-2.compute.internal:8020/user/hadoop/.flink/application_1452250761414_0005/flink-dist-0.10.0.jar
>>> 13:31:06,843 INFO  org.apache.flink.yarn.Utils
>>>         - Copying from /home/hadoop/flink-0.10.0/conf/flink-conf.yaml to
>>> hdfs://ip-172-31-33-221.us-west-2.compute.internal:8020/user/hadoop/.flink/application_1452250761414_0005/flink-conf.yaml
>>> 13:31:06,857 INFO  org.apache.flink.yarn.Utils
>>>         - Copying from file:/home/hadoop/flink-0.10.0/conf/logback.xml to
>>> hdfs://ip-172-31-33-221.us-west-2.compute.internal:8020/user/hadoop/.flink/application_1452250761414_0005/logback.xml
>>> 13:31:06,869 INFO  org.apache.flink.yarn.Utils
>>>         - Copying from file:/home/hadoop/flink-0.10.0/conf/log4j.properties
>>> to
>>> hdfs://ip-172-31-33-221.us-west-2.compute.internal:8020/user/hadoop/.flink/application_1452250761414_0005/log4j.properties
>>> 13:31:06,892 INFO  org.apache.flink.yarn.FlinkYarnClient
>>>         - Submitting application master application_1452250761414_0005
>>> 13:31:06,917 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl
>>>         - Submitted application application_1452250761414_0005
>>> 13:31:06,917 INFO  org.apache.flink.yarn.FlinkYarnClient
>>>         - Waiting for the cluster to be allocated
>>> 13:31:06,919 INFO  org.apache.flink.yarn.FlinkYarnClient
>>>         - Deploying cluster, current state ACCEPTED
>>> 13:31:07,920 INFO  org.apache.flink.yarn.FlinkYarnClient
>>>         - Deploying cluster, current state ACCEPTED
>>> 13:31:08,922 INFO  org.apache.flink.yarn.FlinkYarnClient
>>>         - Deploying cluster, current state ACCEPTED
>>> 13:31:09,924 INFO  org.apache.flink.yarn.FlinkYarnClient
>>>         - Deploying cluster, current state ACCEPTED
>>> 13:31:10,925 INFO  org.apache.flink.yarn.FlinkYarnClient
>>>         - YARN application has been deployed successfully.
>>> 13:31:10,929 INFO  org.apache.flink.yarn.FlinkYarnCluster
>>>         - Start actor system.
>>> 13:31:11,412 INFO  akka.event.slf4j.Slf4jLogger
>>>         - Slf4jLogger started
>>> 13:31:11,472 INFO  Remoting
>>>         - Starting remoting
>>> 13:31:11,698 INFO  Remoting
>>>         - Remoting started; listening on addresses :[
>>> akka.tcp://flink@172.31.33.221:39464]
>>> 13:31:11,733 INFO  org.apache.flink.yarn.FlinkYarnCluster
>>>         - Start application client.
>>> 13:31:11,737 INFO  org.apache.flink.client.CliFrontend
>>>         - YARN cluster started
>>> 13:31:11,737 INFO  org.apache.flink.client.CliFrontend
>>>         - JobManager web interface address
>>> http://ip-172-31-33-221.us-west-2.compute.internal:20888/proxy/application_1452250761414_0005/
>>> 13:31:11,737 INFO  org.apache.flink.client.CliFrontend
>>>         - Waiting until all TaskManagers have connected
>>> 13:31:11,748 INFO  org.apache.flink.yarn.ApplicationClient
>>>         - Notification about new leader address
>>> akka.tcp://flink@172.31.45.98:46965/user/jobmanager with session ID
>>> null.
>>> 13:31:11,752 INFO  org.apache.flink.client.CliFrontend
>>>         - No status updates from the YARN cluster received so far. Waiting
>>> ...
>>> 13:31:11,752 INFO  org.apache.flink.yarn.ApplicationClient
>>>         - Received address of new leader
>>> akka.tcp://flink@172.31.45.98:46965/user/jobmanager with session ID
>>> null.
>>> 13:31:11,753 INFO  org.apache.flink.yarn.ApplicationClient
>>>         - Disconnect from JobManager null.
>>> 13:31:11,757 INFO  org.apache.flink.yarn.ApplicationClient
>>>         - Trying to register at JobManager
>>> akka.tcp://flink@172.31.45.98:46965/user/jobmanager.
>>> 13:31:12,040 INFO  org.apache.flink.yarn.ApplicationClient
>>>         - Successfully registered at the JobManager Actor[
>>> akka.tcp://flink@172.31.45.98:46965/user/jobmanager#1549383782]
>>> 13:31:12,253 INFO  org.apache.flink.client.CliFrontend
>>>         - TaskManager status (0/1)
>>> 13:31:12,753 INFO  org.apache.flink.client.CliFrontend
>>>         - TaskManager status (0/1)
>>> 13:31:13,254 INFO  org.apache.flink.client.CliFrontend
>>>         - TaskManager status (0/1)
>>> 13:31:13,755 INFO  org.apache.flink.client.CliFrontend
>>>         - TaskManager status (0/1)
>>> 13:31:14,255 INFO  org.apache.flink.client.CliFrontend
>>>         - TaskManager status (0/1)
>>> 13:31:14,756 INFO  org.apache.flink.client.CliFrontend
>>>         - TaskManager status (0/1)
>>> 13:31:15,257 INFO  org.apache.flink.client.CliFrontend
>>>         - TaskManager status (0/1)
>>> 13:31:15,758 INFO  org.apache.flink.client.CliFrontend
>>>         - TaskManager status (0/1)
>>> 13:31:16,258 INFO  org.apache.flink.client.CliFrontend
>>>         - All TaskManagers are connected
>>> 13:31:16,264 INFO  org.apache.flink.client.program.Client
>>>         - Starting client actor system
>>> 13:31:16,265 INFO  org.apache.flink.runtime.client.JobClient
>>>         - Starting JobClient actor system
>>> 13:31:16,283 INFO  akka.event.slf4j.Slf4jLogger
>>>         - Slf4jLogger started
>>> 13:31:16,288 INFO  Remoting
>>>         - Starting remoting
>>> 13:31:16,301 INFO  Remoting
>>>         - Remoting started; listening on addresses :[
>>> akka.tcp://flink@127.0.0.1:45919]
>>> 13:31:16,302 INFO  org.apache.flink.runtime.client.JobClient
>>>         - Started JobClient actor system at 127.0.0.1:45919
>>> 13:31:16,302 INFO  org.apache.flink.client.CliFrontend
>>>         - Using the parallelism provided by the remote cluster (4). To use
>>> another parallelism, set it at the ./bin/flink client.
>>> 13:31:16,302 INFO  org.apache.flink.client.CliFrontend
>>>         - Starting execution of program
>>> 13:31:16,303 INFO  org.apache.flink.client.program.Client
>>>         - Starting program in interactive mode
>>> 13:31:16,313 INFO  eu.amidst.flinklink.examples.WordCountExample
>>>         - Entering application.
>>> 13:31:16,342 INFO  TestClass.class
>>>         - Logger in TestClass
>>> 13:31:16,346 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>>>         - class eu.amidst.flinklink.examples.WordCountExample$TestClass is
>>> not a valid POJO type
>>> 13:31:16,376 INFO  org.apache.flink.client.CliFrontend
>>>         - Program execution finished
>>> 13:31:16,384 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator
>>>         - Shutting down remote daemon.
>>> 13:31:16,386 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator
>>>         - Remote daemon shut down; proceeding with flushing remote
>>> transports.
>>> 13:31:16,408 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator
>>>         - Remoting shut down.
>>> 13:31:16,431 INFO  org.apache.flink.client.CliFrontend
>>>         - Shutting down YARN cluster
>>> 13:31:16,431 INFO  org.apache.flink.yarn.FlinkYarnCluster
>>>         - Sending shutdown request to the Application Master
>>> 13:31:16,432 INFO  org.apache.flink.yarn.ApplicationClient
>>>         - Sending StopYarnSession request to ApplicationMaster.
>>> 13:31:16,568 INFO  org.apache.flink.yarn.ApplicationClient
>>>         - Remote JobManager has been stopped successfully. Stopping local
>>> application client
>>> 13:31:16,570 INFO  org.apache.flink.yarn.ApplicationClient
>>>         - Stopped Application client.
>>> 13:31:16,570 INFO  org.apache.flink.yarn.ApplicationClient
>>>         - Disconnect from JobManager Actor[
>>> akka.tcp://flink@172.31.45.98:46965/user/jobmanager#1549383782].
>>> 13:31:16,573 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator
>>>         - Shutting down remote daemon.
>>> 13:31:16,573 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator
>>>         - Remote daemon shut down; proceeding with flushing remote
>>> transports.
>>> 13:31:16,584 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator
>>>         - Remoting shut down.
>>> 13:31:16,595 INFO  org.apache.flink.yarn.FlinkYarnCluster
>>>         - Deleting files in
>>> hdfs://ip-172-31-33-221.us-west-2.compute.internal:8020/user/hadoop/.flink/application_1452250761414_0005
>>> 13:31:16,596 INFO  org.apache.flink.yarn.FlinkYarnCluster
>>>         - Application application_1452250761414_0005 finished with state
>>> FINISHED and final state SUCCEEDED at 1452259876445
>>> 13:31:16,747 INFO  org.apache.flink.yarn.FlinkYarnCluster
>>>         - YARN Client is shutting down
>>>
>>>
>>> You can see the log messages from the WordCountExample and TestClass
>>> classes. But I have problems to show the logger message (INFO) in the
>>> LineSplitter class. Presumably, because it is executed in the CORE nodes
>>> and node in the MASTER node (it all runs well in my local computer).
>>>
>>> Any tips?
>>> Ana
>>>
>>>
>>> On 06 Jan 2016, at 15:58, Ana M. Martinez <an...@cs.aau.dk> wrote:
>>>
>>> Hi Till,
>>>
>>> I am afraid it does not work in any case.
>>>
>>> I am following the steps you indicate on your websites (for yarn
>>> configuration and loggers with slf4j):
>>>
>>> 1) Enable log aggregation in yarn-site:
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-0.7/yarn_setup.html#log-files
>>>
>>> 2) Include Loggers as indicated here (see WordCountExample below):
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-0.7/internal_logging.html
>>>
>>> But I cannot get the log messages that run in the map functions. Am I
>>> missing something?
>>>
>>> Thanks,
>>> Ana
>>>
>>> On 04 Jan 2016, at 14:00, Till Rohrmann <tr...@apache.org> wrote:
>>>
>>> I think the YARN application has to be finished in order for the logs to
>>> be accessible.
>>>
>>> Judging from you commands, you’re starting a long running YARN
>>> application running Flink with ./bin/yarn-session.sh -n 1 -tm 2048 -s 4.
>>> This cluster won’t be used though, because you’re executing your job with ./bin/flink
>>> run -m yarn-cluster which will start another YARN application which is
>>> only alive as long as the Flink job is executed. If you want to run your
>>> job on the long running YARN application, then you simply have to omit -m
>>> yarn-cluster.
>>>
>>> Cheers,
>>> Till
>>> ​
>>>
>>> On Mon, Jan 4, 2016 at 12:36 PM, Ana M. Martinez <an...@cs.aau.dk> wrote:
>>>
>>>> Hi Till,
>>>>
>>>> Sorry for the delay (Xmas break). I have activated log aggregation
>>>> on flink-conf.yaml with yarn.log-aggregation-enable: true (as I can’t find
>>>> a yarn-site.xml).
>>>> But the command yarn logs -applicationId application_1451903796996_0008
>>>> gives me the following output:
>>>>
>>>> INFO client.RMProxy: Connecting to ResourceManager at xxx
>>>> /var/log/hadoop-yarn/apps/hadoop/logs/application_1451903796996_0008does
>>>> not exist.
>>>> Log aggregation has not completed or is not enabled
>>>>
>>>>
>>>> I’ve tried to restart the Flink JobManager and TaskManagers as follows:
>>>> ./bin/yarn-session.sh -n 1 -tm 2048 -s 4
>>>> and then with a detached screen, run my application with ./bin/flink
>>>> run -m yarn-cluster ...
>>>>
>>>> I am not sure if my problem is that I am not setting the
>>>> log-aggregation-enable property well or I am not restarting the Flink
>>>> JobManager and TaskManagers as I should… Any idea?
>>>>
>>>> Thanks,
>>>> Ana
>>>>
>>>> On 18 Dec 2015, at 16:29, Till Rohrmann <tr...@apache.org> wrote:
>>>>
>>>> In which log file are you exactly looking for the logging statements?
>>>> And on what machine? You have to look on the machines on which the yarn
>>>> container were started. Alternatively if you have log aggregation
>>>> activated, then you can simply retrieve the log files via yarn logs.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Fri, Dec 18, 2015 at 3:49 PM, Ana M. Martinez <an...@cs.aau.dk> wrote:
>>>>
>>>>> Hi Till,
>>>>>
>>>>> Many thanks for your quick response.
>>>>>
>>>>> I have modified the WordCountExample to re-reproduce my problem in a
>>>>> simple example.
>>>>>
>>>>> I run the code below with the following command:
>>>>> ./bin/flink run -m yarn-cluster -yn 1 -ys 4 -yjm 1024 -ytm 1024 -c
>>>>> mypackage.WordCountExample ../flinklink.jar
>>>>>
>>>>> And if I check the log file I see all logger messages except the one
>>>>> in the flatMap function of the inner LineSplitter class, which is actually
>>>>> the one I am most interested in.
>>>>>
>>>>> Is that an expected behaviour?
>>>>>
>>>>> Thanks,
>>>>> Ana
>>>>>
>>>>> import org.apache.flink.api.common.functions.FlatMapFunction;
>>>>> import org.apache.flink.api.java.DataSet;
>>>>> import org.apache.flink.api.java.ExecutionEnvironment;
>>>>> import org.apache.flink.api.java.tuple.Tuple2;
>>>>> import org.apache.flink.util.Collector;
>>>>> import org.slf4j.Logger;
>>>>> import org.slf4j.LoggerFactory;
>>>>>
>>>>> import java.io.Serializable;
>>>>> import java.util.ArrayList;
>>>>> import java.util.List;
>>>>>
>>>>> public class WordCountExample {
>>>>>     static Logger logger = LoggerFactory.getLogger(WordCountExample.class);
>>>>>
>>>>>     public static void main(String[] args) throws Exception {
>>>>>         final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
>>>>>
>>>>>         logger.info("Entering application.");
>>>>>
>>>>>     DataSet<String> text = env.fromElements(
>>>>>                 "Who's there?",
>>>>>                 "I think I hear them. Stand, ho! Who's there?");
>>>>>
>>>>>         List<Integer> elements = new ArrayList<Integer>();
>>>>>         elements.add(0);
>>>>>
>>>>>
>>>>>         DataSet<TestClass> set = env.fromElements(new TestClass(elements));
>>>>>
>>>>>         DataSet<Tuple2<String, Integer>> wordCounts = text
>>>>>                 .flatMap(new LineSplitter())
>>>>>                 .withBroadcastSet(set, "set")
>>>>>                 .groupBy(0)
>>>>>                 .sum(1);
>>>>>
>>>>>         wordCounts.print();
>>>>>
>>>>>
>>>>>     }
>>>>>
>>>>>     public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
>>>>>
>>>>>         static Logger loggerLineSplitter = LoggerFactory.getLogger(LineSplitter.class);
>>>>>
>>>>>         @Override
>>>>>         public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
>>>>>             loggerLineSplitter.info("Logger in LineSplitter.flatMap");
>>>>>             for (String word : line.split(" ")) {
>>>>>                 out.collect(new Tuple2<String, Integer>(word, 1));
>>>>>             }
>>>>>         }
>>>>>     }
>>>>>
>>>>>     public static class TestClass implements Serializable {
>>>>>         private static final long serialVersionUID = -2932037991574118651L;
>>>>>
>>>>>         static Logger loggerTestClass = LoggerFactory.getLogger("WordCountExample.TestClass");
>>>>>
>>>>>         List<Integer> integerList;
>>>>>         public TestClass(List<Integer> integerList){
>>>>>             this.integerList=integerList;
>>>>>             loggerTestClass.info("Logger in TestClass");
>>>>>         }
>>>>>
>>>>>
>>>>>     }
>>>>> }
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On 17 Dec 2015, at 16:08, Till Rohrmann <tr...@apache.org> wrote:
>>>>>
>>>>> Hi Ana,
>>>>>
>>>>> you can simply modify the `log4j.properties` file in the `conf`
>>>>> directory. It should be automatically included in the Yarn application.
>>>>>
>>>>> Concerning your logging problem, it might be that you have set the
>>>>> logging level too high. Could you share the code with us?
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Thu, Dec 17, 2015 at 1:56 PM, Ana M. Martinez <an...@cs.aau.dk>
>>>>> wrote:
>>>>>
>>>>>> Hi flink community,
>>>>>>
>>>>>> I am trying to show log messages using log4j.
>>>>>> It works fine overall except for the messages I want to show in an
>>>>>> inner class that implements
>>>>>> org.apache.flink.api.common.aggregators.ConvergenceCriterion.
>>>>>> I am very new to this, but it seems that I’m having problems to show
>>>>>> the messages included in the isConverged function, as it runs in the task
>>>>>> managers?
>>>>>> E.g. the log messages in the outer class (before map-reduce
>>>>>> operations) are properly shown.
>>>>>>
>>>>>> I am also interested in providing my own log4j.properties file. I am
>>>>>> using the ./bin/flink run -m yarn-cluster on Amazon clusters.
>>>>>>
>>>>>> Thanks,
>>>>>> Ana
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>
>>>
>>>
>>
>>
>
>

Re: Problem to show logs in task managers

Posted by "Ana M. Martinez" <an...@cs.aau.dk>.
Hi Till,

Thanks for your help. I have checked both in Yarn’s web interface and through command line and it seems that there are not occupied containers.

Additionally, I have checked the configuration values in the web interface and even though I have changed the log.aggregation property in the yarn-site.xml file to true, it appears as false and with the following source label:
<property>
<name>yarn.log-aggregation-enable</name>
<value>false</value>
<source>java.io.BufferedInputStream@3c407114</source>
</property>

I am not sure if that is relevant. I had assumed that the "./bin/flink run -m yarn-cluster" command is starting a yarn session and thus reloading the yarn-site file. Is that right? If I am wrong here, then, how can I restart it so that the modifications in the yarn-site.xml file are considered? (I have also tried with ./bin/yarn-session.sh and then ./bin/flink run without success…).

I am not sure if this is related to flink anymore, should I move my problem to the yarn community instead?

Thanks,
Ana

On 11 Jan 2016, at 10:37, Till Rohrmann <tr...@apache.org>> wrote:


Hi Ana,

good to hear that you found the logging statements. You can check in Yarn’s web interface whether there are still occupied containers. Alternatively you can go to the different machines and run jps which lists you the running Java processes. If you see an ApplicationMaster or YarnTaskManagerRunner process, then there is still a container running with Flink on this machine. I hope this helps you.

Cheers,
Till

​

On Mon, Jan 11, 2016 at 9:37 AM, Ana M. Martinez <an...@cs.aau.dk>> wrote:
Hi Till,

Thanks for that! I can see the "Logger in LineSplitter.flatMap” output if I retrieve the task manager logs manually (under /var/log/hadoop-yarn/containers/application_X/…). However that solution is not ideal when for instance I am using 32 machines for my mapReduce operations.

I would like to know why Yarn’s log aggregation is not working. Can you tell me how to check if there are some Yarn containers running after the Flink job has finished? I have tried:
hadoop job -list
but I cannot see any jobs there, although I am not sure that it means that there are not containers running...

Thanks,
Ana

On 08 Jan 2016, at 16:24, Till Rohrmann <tr...@apache.org>> wrote:


You’re right that the log statements of the LineSplitter are in the logs of the cluster nodes, because that’s where the LineSplitter code is executed. In contrast, you create a TestClass on the client when you submit the program. Therefore, you see the logging statement “Logger in TestClass” on the command line or in the cli log file.

So I would assume that the problem is Yarn’s log aggregation. Either your configuration is not correct or there are still some Yarn containers running after the Flink job has finished. Yarn will only show you the logs after all containers are terminated. Maybe you could check that. Alternatively, you can try to retrieve the taskmanager logs manually by going to the machine where your yarn container was executed. Then under hadoop/logs/userlogs you should find somewhere the logs.

Cheers,
Till

​

On Fri, Jan 8, 2016 at 3:50 PM, Ana M. Martinez <an...@cs.aau.dk>> wrote:
Thanks for the tip Robert! It was a good idea to rule out other possible causes, but I am afraid that is not the problem. If we stick to the WordCountExample (for simplicity), the Exception is thrown if placed into the flatMap function.

I am going to try to re-write my problem and all the settings below:

When I try to aggregate all logs:
 $yarn logs -applicationId application_1452250761414_0005

the following message is retrieved:
16/01/08 13:32:37 INFO client.RMProxy: Connecting to ResourceManager at ip-172-31-33-221.us<http://ip-172-31-33-221.us/>-west-2.compute.internal/172.31.33.221:8032<http://172.31.33.221:8032/>
/var/log/hadoop-yarn/apps/hadoop/logs/application_1452250761414_0005does not exist.
Log aggregation has not completed or is not enabled.

(Tried the same command a few minutes later and got the same message, so might it be that log aggregation is not properly enabled??)

I am going to carefully enumerate all the steps I have followed (and settings) to see if someone can identify why the Logger messages from CORE nodes (in an Amazon cluster) are not shown.

1) Enable yarn.log-aggregation-enable property to true in /etc/alternatives/hadoop-conf/yarn-site.xml.

2) Include log messages in my WordCountExample as follows:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;


public class WordCountExample {
    static Logger logger = LoggerFactory.getLogger(WordCountExample.class);

    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        logger.info("Entering application.");

        DataSet<String> text = env.fromElements(
                "Who's there?",
                "I think I hear them. Stand, ho! Who's there?");

        List<Integer> elements = new ArrayList<Integer>();
        elements.add(0);


        DataSet<TestClass> set = env.fromElements(new TestClass(elements));

        DataSet<Tuple2<String, Integer>> wordCounts = text
                .flatMap(new LineSplitter())
                .withBroadcastSet(set, "set")
                .groupBy(0)
                .sum(1);

        wordCounts.writeAsText(“output.txt", FileSystem.WriteMode.OVERWRITE);


    }

    public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {

        static Logger loggerLineSplitter = LoggerFactory.getLogger(LineSplitter.class);

        @Override
        public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
            loggerLineSplitter.info("Logger in LineSplitter.flatMap");

                for (String word : line.split(" ")) {
                        out.collect(new Tuple2<String, Integer>(word, 1));
                        //throw new RuntimeException("LineSplitter class called");
                }

        }
    }

    public static class TestClass implements Serializable {
        private static final long serialVersionUID = -2932037991574118651L;

        static Logger loggerTestClass = LoggerFactory.getLogger("TestClass.class");

        List<Integer> integerList;
        public TestClass(List<Integer> integerList){
            this.integerList=integerList;
            loggerTestClass.info("Logger in TestClass");
        }


    }
}

3) Start a yarn-cluster and execute my program with the following command:

$./bin/flink run -m yarn-cluster -yn 1 -ys 4 -yjm 1024 -ytm 1024 -c eu.amidst.flinklink.examples.WordCountExample ../flinklink.jar


4) The output in the log folder is as follows:

13:31:04,945 INFO  org.apache.flink.client.CliFrontend                           - --------------------------------------------------------------------------------
13:31:04,947 INFO  org.apache.flink.client.CliFrontend                           -  Starting Command Line Client (Version: 0.10.0, Rev:ab2cca4, Date:10.11.2015 @ 13:50:14 UTC)
13:31:04,947 INFO  org.apache.flink.client.CliFrontend                           -  Current user: hadoop
13:31:04,947 INFO  org.apache.flink.client.CliFrontend                           -  JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.65-b01
13:31:04,947 INFO  org.apache.flink.client.CliFrontend                           -  Maximum heap size: 3344 MiBytes
13:31:04,947 INFO  org.apache.flink.client.CliFrontend                           -  JAVA_HOME: /etc/alternatives/jre
13:31:04,950 INFO  org.apache.flink.client.CliFrontend                           -  Hadoop version: 2.6.0
13:31:04,950 INFO  org.apache.flink.client.CliFrontend                           -  JVM Options:
13:31:04,950 INFO  org.apache.flink.client.CliFrontend                           -     -Dlog.file=/home/hadoop/flink-0.10.0/log/flink-hadoop-client-ip-172-31-33-221.log
13:31:04,950 INFO  org.apache.flink.client.CliFrontend                           -     -Dlog4j.configuration=file:/home/hadoop/flink-0.10.0/conf/log4j-cli.properties
13:31:04,950 INFO  org.apache.flink.client.CliFrontend                           -     -Dlogback.configurationFile=file:/home/hadoop/flink-0.10.0/conf/logback.xml
13:31:04,951 INFO  org.apache.flink.client.CliFrontend                           -  Program Arguments:
13:31:04,951 INFO  org.apache.flink.client.CliFrontend                           -     run
13:31:04,951 INFO  org.apache.flink.client.CliFrontend                           -     -m
13:31:04,951 INFO  org.apache.flink.client.CliFrontend                           -     yarn-cluster
13:31:04,951 INFO  org.apache.flink.client.CliFrontend                           -     -yn
13:31:04,951 INFO  org.apache.flink.client.CliFrontend                           -     1
13:31:04,951 INFO  org.apache.flink.client.CliFrontend                           -     -ys
13:31:04,951 INFO  org.apache.flink.client.CliFrontend                           -     4
13:31:04,951 INFO  org.apache.flink.client.CliFrontend                           -     -yjm
13:31:04,951 INFO  org.apache.flink.client.CliFrontend                           -     1024
13:31:04,951 INFO  org.apache.flink.client.CliFrontend                           -     -ytm
13:31:04,951 INFO  org.apache.flink.client.CliFrontend                           -     1024
13:31:04,952 INFO  org.apache.flink.client.CliFrontend                           -     -c
13:31:04,952 INFO  org.apache.flink.client.CliFrontend                           -     eu.amidst.flinklink.examples.WordCountExample
13:31:04,952 INFO  org.apache.flink.client.CliFrontend                           -     ../flinklink.jar
13:31:04,952 INFO  org.apache.flink.client.CliFrontend                           - --------------------------------------------------------------------------------
13:31:04,954 INFO  org.apache.flink.client.CliFrontend                           - Using configuration directory /home/hadoop/flink-0.10.0/conf
13:31:04,954 INFO  org.apache.flink.client.CliFrontend                           - Trying to load configuration file
13:31:05,193 INFO  org.apache.flink.client.CliFrontend                           - Running 'run' command.
13:31:05,201 INFO  org.apache.flink.client.CliFrontend                           - Building program from JAR file
13:31:05,326 INFO  org.apache.flink.client.CliFrontend                           - YARN cluster mode detected. Switching Log4j output to console
13:31:05,385 INFO  org.apache.hadoop.yarn.client.RMProxy                         - Connecting to ResourceManager at ip-172-31-33-221.us<http://ip-172-31-33-221.us/>-west-2.compute.internal/172.31.33.221:8032<http://172.31.33.221:8032/>
13:31:05,534 INFO  org.apache.flink.client.FlinkYarnSessionCli                   - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.FlinkYarnClient to locate the jar
13:31:05,545 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Using values:
13:31:05,547 INFO  org.apache.flink.yarn.FlinkYarnClient                         - TaskManager count = 1
13:31:05,547 INFO  org.apache.flink.yarn.FlinkYarnClient                         - JobManager memory = 1024
13:31:05,547 INFO  org.apache.flink.yarn.FlinkYarnClient                         - TaskManager memory = 1024
13:31:06,112 INFO  org.apache.flink.yarn.Utils                                   - Copying from file:/home/hadoop/flink-0.10.0/lib/flink-dist-0.10.0.jar to hdfs://ip-172-31-33-221.us-west-2.compute.internal:8020/user/hadoop/.flink/application_1452250761414_0005/flink-dist-0.10.0.jar
13:31:06,843 INFO  org.apache.flink.yarn.Utils                                   - Copying from /home/hadoop/flink-0.10.0/conf/flink-conf.yaml to hdfs://ip-172-31-33-221.us-west-2.compute.internal:8020/user/hadoop/.flink/application_1452250761414_0005/flink-conf.yaml
13:31:06,857 INFO  org.apache.flink.yarn.Utils                                   - Copying from file:/home/hadoop/flink-0.10.0/conf/logback.xml to hdfs://ip-172-31-33-221.us-west-2.compute.internal:8020/user/hadoop/.flink/application_1452250761414_0005/logback.xml
13:31:06,869 INFO  org.apache.flink.yarn.Utils                                   - Copying from file:/home/hadoop/flink-0.10.0/conf/log4j.properties to hdfs://ip-172-31-33-221.us-west-2.compute.internal:8020/user/hadoop/.flink/application_1452250761414_0005/log4j.properties
13:31:06,892 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Submitting application master application_1452250761414_0005
13:31:06,917 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl         - Submitted application application_1452250761414_0005
13:31:06,917 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Waiting for the cluster to be allocated
13:31:06,919 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
13:31:07,920 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
13:31:08,922 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
13:31:09,924 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
13:31:10,925 INFO  org.apache.flink.yarn.FlinkYarnClient                         - YARN application has been deployed successfully.
13:31:10,929 INFO  org.apache.flink.yarn.FlinkYarnCluster                        - Start actor system.
13:31:11,412 INFO  akka.event.slf4j.Slf4jLogger                                  - Slf4jLogger started
13:31:11,472 INFO  Remoting                                                      - Starting remoting
13:31:11,698 INFO  Remoting                                                      - Remoting started; listening on addresses :[akka.tcp://flink@172.31.33.221:39464]
13:31:11,733 INFO  org.apache.flink.yarn.FlinkYarnCluster                        - Start application client.
13:31:11,737 INFO  org.apache.flink.client.CliFrontend                           - YARN cluster started
13:31:11,737 INFO  org.apache.flink.client.CliFrontend                           - JobManager web interface address http://ip-172-31-33-221.us-west-2.compute.internal:20888/proxy/application_1452250761414_0005/
13:31:11,737 INFO  org.apache.flink.client.CliFrontend                           - Waiting until all TaskManagers have connected
13:31:11,748 INFO  org.apache.flink.yarn.ApplicationClient                       - Notification about new leader address akka.tcp://flink@172.31.45.98:46965/user/jobmanager with session ID null.
13:31:11,752 INFO  org.apache.flink.client.CliFrontend                           - No status updates from the YARN cluster received so far. Waiting ...
13:31:11,752 INFO  org.apache.flink.yarn.ApplicationClient                       - Received address of new leader akka.tcp://flink@172.31.45.98:46965/user/jobmanager with session ID null.
13:31:11,753 INFO  org.apache.flink.yarn.ApplicationClient                       - Disconnect from JobManager null.
13:31:11,757 INFO  org.apache.flink.yarn.ApplicationClient                       - Trying to register at JobManager akka.tcp://flink@172.31.45.98:46965/user/jobmanager.
13:31:12,040 INFO  org.apache.flink.yarn.ApplicationClient                       - Successfully registered at the JobManager Actor[akka.tcp://flink@172.31.45.98:46965/user/jobmanager#1549383782]
13:31:12,253 INFO  org.apache.flink.client.CliFrontend                           - TaskManager status (0/1)
13:31:12,753 INFO  org.apache.flink.client.CliFrontend                           - TaskManager status (0/1)
13:31:13,254 INFO  org.apache.flink.client.CliFrontend                           - TaskManager status (0/1)
13:31:13,755 INFO  org.apache.flink.client.CliFrontend                           - TaskManager status (0/1)
13:31:14,255 INFO  org.apache.flink.client.CliFrontend                           - TaskManager status (0/1)
13:31:14,756 INFO  org.apache.flink.client.CliFrontend                           - TaskManager status (0/1)
13:31:15,257 INFO  org.apache.flink.client.CliFrontend                           - TaskManager status (0/1)
13:31:15,758 INFO  org.apache.flink.client.CliFrontend                           - TaskManager status (0/1)
13:31:16,258 INFO  org.apache.flink.client.CliFrontend                           - All TaskManagers are connected
13:31:16,264 INFO  org.apache.flink.client.program.Client                        - Starting client actor system
13:31:16,265 INFO  org.apache.flink.runtime.client.JobClient                     - Starting JobClient actor system
13:31:16,283 INFO  akka.event.slf4j.Slf4jLogger                                  - Slf4jLogger started
13:31:16,288 INFO  Remoting                                                      - Starting remoting
13:31:16,301 INFO  Remoting                                                      - Remoting started; listening on addresses :[akka.tcp://flink@127.0.0.1:45919]
13:31:16,302 INFO  org.apache.flink.runtime.client.JobClient                     - Started JobClient actor system at 127.0.0.1:45919<http://127.0.0.1:45919/>
13:31:16,302 INFO  org.apache.flink.client.CliFrontend                           - Using the parallelism provided by the remote cluster (4). To use another parallelism, set it at the ./bin/flink client.
13:31:16,302 INFO  org.apache.flink.client.CliFrontend                           - Starting execution of program
13:31:16,303 INFO  org.apache.flink.client.program.Client                        - Starting program in interactive mode
13:31:16,313 INFO  eu.amidst.flinklink.examples.WordCountExample                 - Entering application.
13:31:16,342 INFO  TestClass.class                                               - Logger in TestClass
13:31:16,346 INFO  org.apache.flink.api.java.typeutils.TypeExtractor             - class eu.amidst.flinklink.examples.WordCountExample$TestClass is not a valid POJO type
13:31:16,376 INFO  org.apache.flink.client.CliFrontend                           - Program execution finished
13:31:16,384 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Shutting down remote daemon.
13:31:16,386 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remote daemon shut down; proceeding with flushing remote transports.
13:31:16,408 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remoting shut down.
13:31:16,431 INFO  org.apache.flink.client.CliFrontend                           - Shutting down YARN cluster
13:31:16,431 INFO  org.apache.flink.yarn.FlinkYarnCluster                        - Sending shutdown request to the Application Master
13:31:16,432 INFO  org.apache.flink.yarn.ApplicationClient                       - Sending StopYarnSession request to ApplicationMaster.
13:31:16,568 INFO  org.apache.flink.yarn.ApplicationClient                       - Remote JobManager has been stopped successfully. Stopping local application client
13:31:16,570 INFO  org.apache.flink.yarn.ApplicationClient                       - Stopped Application client.
13:31:16,570 INFO  org.apache.flink.yarn.ApplicationClient                       - Disconnect from JobManager Actor[akka.tcp://flink@172.31.45.98:46965/user/jobmanager#1549383782].
13:31:16,573 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Shutting down remote daemon.
13:31:16,573 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remote daemon shut down; proceeding with flushing remote transports.
13:31:16,584 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remoting shut down.
13:31:16,595 INFO  org.apache.flink.yarn.FlinkYarnCluster                        - Deleting files in hdfs://ip-172-31-33-221.us-west-2.compute.internal:8020/user/hadoop/.flink/application_1452250761414_0005
13:31:16,596 INFO  org.apache.flink.yarn.FlinkYarnCluster                        - Application application_1452250761414_0005 finished with state FINISHED and final state SUCCEEDED at 1452259876445
13:31:16,747 INFO  org.apache.flink.yarn.FlinkYarnCluster                        - YARN Client is shutting down


You can see the log messages from the WordCountExample and TestClass classes. But I have problems to show the logger message (INFO) in the LineSplitter class. Presumably, because it is executed in the CORE nodes and node in the MASTER node (it all runs well in my local computer).

Any tips?
Ana


On 06 Jan 2016, at 15:58, Ana M. Martinez <an...@cs.aau.dk>> wrote:

Hi Till,

I am afraid it does not work in any case.

I am following the steps you indicate on your websites (for yarn configuration and loggers with slf4j):

1) Enable log aggregation in yarn-site:
https://ci.apache.org/projects/flink/flink-docs-release-0.7/yarn_setup.html#log-files

2) Include Loggers as indicated here (see WordCountExample below):
https://ci.apache.org/projects/flink/flink-docs-release-0.7/internal_logging.html

But I cannot get the log messages that run in the map functions. Am I missing something?

Thanks,
Ana

On 04 Jan 2016, at 14:00, Till Rohrmann <tr...@apache.org>> wrote:


I think the YARN application has to be finished in order for the logs to be accessible.

Judging from you commands, you’re starting a long running YARN application running Flink with ./bin/yarn-session.sh -n 1 -tm 2048 -s 4. This cluster won’t be used though, because you’re executing your job with ./bin/flink run -m yarn-cluster which will start another YARN application which is only alive as long as the Flink job is executed. If you want to run your job on the long running YARN application, then you simply have to omit -m yarn-cluster.

Cheers,
Till

​

On Mon, Jan 4, 2016 at 12:36 PM, Ana M. Martinez <an...@cs.aau.dk>> wrote:
Hi Till,

Sorry for the delay (Xmas break). I have activated log aggregation on flink-conf.yaml with yarn.log-aggregation-enable: true (as I can’t find a yarn-site.xml).
But the command yarn logs -applicationId application_1451903796996_0008 gives me the following output:

INFO client.RMProxy: Connecting to ResourceManager at xxx
/var/log/hadoop-yarn/apps/hadoop/logs/application_1451903796996_0008does not exist.
Log aggregation has not completed or is not enabled

I’ve tried to restart the Flink JobManager and TaskManagers as follows:
./bin/yarn-session.sh -n 1 -tm 2048 -s 4
and then with a detached screen, run my application with ./bin/flink run -m yarn-cluster ...

I am not sure if my problem is that I am not setting the log-aggregation-enable property well or I am not restarting the Flink JobManager and TaskManagers as I should… Any idea?

Thanks,
Ana

On 18 Dec 2015, at 16:29, Till Rohrmann <tr...@apache.org>> wrote:

In which log file are you exactly looking for the logging statements? And on what machine? You have to look on the machines on which the yarn container were started. Alternatively if you have log aggregation activated, then you can simply retrieve the log files via yarn logs.

Cheers,
Till

On Fri, Dec 18, 2015 at 3:49 PM, Ana M. Martinez <an...@cs.aau.dk>> wrote:
Hi Till,

Many thanks for your quick response.

I have modified the WordCountExample to re-reproduce my problem in a simple example.

I run the code below with the following command:
./bin/flink run -m yarn-cluster -yn 1 -ys 4 -yjm 1024 -ytm 1024 -c mypackage.WordCountExample ../flinklink.jar

And if I check the log file I see all logger messages except the one in the flatMap function of the inner LineSplitter class, which is actually the one I am most interested in.

Is that an expected behaviour?

Thanks,
Ana


import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

public class WordCountExample {
    static Logger logger = LoggerFactory.getLogger(WordCountExample.class);

    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        logger.info("Entering application.");

    DataSet<String> text = env.fromElements(
                "Who's there?",
                "I think I hear them. Stand, ho! Who's there?");

        List<Integer> elements = new ArrayList<Integer>();
        elements.add(0);


        DataSet<TestClass> set = env.fromElements(new TestClass(elements));

        DataSet<Tuple2<String, Integer>> wordCounts = text
                .flatMap(new LineSplitter())
                .withBroadcastSet(set, "set")
                .groupBy(0)
                .sum(1);

        wordCounts.print();


    }

    public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {

        static Logger loggerLineSplitter = LoggerFactory.getLogger(LineSplitter.class);

        @Override
        public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
            loggerLineSplitter.info("Logger in LineSplitter.flatMap");
            for (String word : line.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }

    public static class TestClass implements Serializable {
        private static final long serialVersionUID = -2932037991574118651L;

        static Logger loggerTestClass = LoggerFactory.getLogger("WordCountExample.TestClass");

        List<Integer> integerList;
        public TestClass(List<Integer> integerList){
            this.integerList=integerList;
            loggerTestClass.info("Logger in TestClass");
        }


    }
}



On 17 Dec 2015, at 16:08, Till Rohrmann <tr...@apache.org>> wrote:

Hi Ana,

you can simply modify the `log4j.properties` file in the `conf` directory. It should be automatically included in the Yarn application.

Concerning your logging problem, it might be that you have set the logging level too high. Could you share the code with us?

Cheers,
Till

On Thu, Dec 17, 2015 at 1:56 PM, Ana M. Martinez <an...@cs.aau.dk>> wrote:
Hi flink community,

I am trying to show log messages using log4j.
It works fine overall except for the messages I want to show in an inner class that implements org.apache.flink.api.common.aggregators.ConvergenceCriterion.
I am very new to this, but it seems that I’m having problems to show the messages included in the isConverged function, as it runs in the task managers?
E.g. the log messages in the outer class (before map-reduce operations) are properly shown.

I am also interested in providing my own log4j.properties file. I am using the ./bin/flink run -m yarn-cluster on Amazon clusters.

Thanks,
Ana












Re: Problem to show logs in task managers

Posted by Till Rohrmann <tr...@apache.org>.
Hi Ana,

good to hear that you found the logging statements. You can check in Yarn’s
web interface whether there are still occupied containers. Alternatively
you can go to the different machines and run jps which lists you the
running Java processes. If you see an ApplicationMaster or
YarnTaskManagerRunner process, then there is still a container running with
Flink on this machine. I hope this helps you.

Cheers,
Till
​

On Mon, Jan 11, 2016 at 9:37 AM, Ana M. Martinez <an...@cs.aau.dk> wrote:

> Hi Till,
>
> Thanks for that! I can see the "Logger in LineSplitter.flatMap” output if
> I retrieve the task manager logs manually
> (under /var/log/hadoop-yarn/containers/application_X/…). However that
> solution is not ideal when for instance I am using 32 machines for my
> mapReduce operations.
>
> I would like to know why Yarn’s log aggregation is not working. Can you
> tell me how to check if there are some Yarn containers running after the
> Flink job has finished? I have tried:
> hadoop job -list
> but I cannot see any jobs there, although I am not sure that it means that
> there are not containers running...
>
> Thanks,
> Ana
>
> On 08 Jan 2016, at 16:24, Till Rohrmann <tr...@apache.org> wrote:
>
> You’re right that the log statements of the LineSplitter are in the logs
> of the cluster nodes, because that’s where the LineSplitter code is
> executed. In contrast, you create a TestClass on the client when you
> submit the program. Therefore, you see the logging statement “Logger in
> TestClass” on the command line or in the cli log file.
>
> So I would assume that the problem is Yarn’s log aggregation. Either your
> configuration is not correct or there are still some Yarn containers
> running after the Flink job has finished. Yarn will only show you the logs
> after all containers are terminated. Maybe you could check that.
> Alternatively, you can try to retrieve the taskmanager logs manually by
> going to the machine where your yarn container was executed. Then under
> hadoop/logs/userlogs you should find somewhere the logs.
>
> Cheers,
> Till
> ​
>
> On Fri, Jan 8, 2016 at 3:50 PM, Ana M. Martinez <an...@cs.aau.dk> wrote:
>
>> Thanks for the tip Robert! It was a good idea to rule out other possible
>> causes, but I am afraid that is not the problem. If we stick to the
>> WordCountExample (for simplicity), the Exception is thrown if placed into
>> the flatMap function.
>>
>> I am going to try to re-write my problem and all the settings below:
>>
>> When I try to aggregate all logs:
>>  $yarn logs -applicationId application_1452250761414_0005
>>
>> the following message is retrieved:
>> 16/01/08 13:32:37 INFO client.RMProxy: Connecting to ResourceManager at
>> ip-172-31-33-221.us-west-2.compute.internal/172.31.33.221:8032
>> /var/log/hadoop-yarn/apps/hadoop/logs/application_1452250761414_0005does
>> not exist.
>> Log aggregation has not completed or is not enabled.
>>
>> (Tried the same command a few minutes later and got the same message, so
>> might it be that log aggregation is not properly enabled??)
>>
>> I am going to carefully enumerate all the steps I have followed (and
>> settings) to see if someone can identify why the Logger messages from CORE
>> nodes (in an Amazon cluster) are not shown.
>>
>> 1) Enable yarn.log-aggregation-enable property to true
>> in /etc/alternatives/hadoop-conf/yarn-site.xml.
>>
>> 2) Include log messages in my WordCountExample as follows:
>>
>> import org.apache.flink.api.common.functions.FlatMapFunction;
>> import org.apache.flink.api.java.DataSet;
>> import org.apache.flink.api.java.ExecutionEnvironment;
>> import org.apache.flink.api.java.tuple.Tuple2;
>> import org.apache.flink.core.fs.FileSystem;
>> import org.apache.flink.util.Collector;
>> import org.slf4j.Logger;
>> import org.slf4j.LoggerFactory;
>>
>> import java.io.Serializable;
>> import java.util.ArrayList;
>> import java.util.List;
>>
>>
>> public class WordCountExample {
>>     static Logger logger = LoggerFactory.getLogger(WordCountExample.class);
>>
>>     public static void main(String[] args) throws Exception {
>>         final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
>>
>>         logger.info("Entering application.");
>>
>>         DataSet<String> text = env.fromElements(
>>                 "Who's there?",
>>                 "I think I hear them. Stand, ho! Who's there?");
>>
>>         List<Integer> elements = new ArrayList<Integer>();
>>         elements.add(0);
>>
>>
>>         DataSet<TestClass> set = env.fromElements(new TestClass(elements));
>>
>>         DataSet<Tuple2<String, Integer>> wordCounts = text
>>                 .flatMap(new LineSplitter())
>>                 .withBroadcastSet(set, "set")
>>                 .groupBy(0)
>>                 .sum(1);
>>
>>         wordCounts.writeAsText(*“*output.txt", FileSystem.WriteMode.OVERWRITE);
>>
>>
>>     }
>>
>>     public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
>>
>>         static Logger loggerLineSplitter = LoggerFactory.getLogger(LineSplitter.class);
>>
>>         @Override
>>         public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
>>             loggerLineSplitter.info("Logger in LineSplitter.flatMap");
>>
>> 		for (String word : line.split(" ")) {
>>     			out.collect(new Tuple2<String, Integer>(word, 1));
>>     			//throw new RuntimeException("LineSplitter class called");
>> 		}
>>
>>         }
>>     }
>>
>>     public static class TestClass implements Serializable {
>>         private static final long serialVersionUID = -2932037991574118651L;
>>
>>         static Logger loggerTestClass = LoggerFactory.getLogger("TestClass.class");
>>
>>         List<Integer> integerList;
>>         public TestClass(List<Integer> integerList){
>>             this.integerList=integerList;
>>             loggerTestClass.info("Logger in TestClass");
>>         }
>>
>>
>>     }
>> }
>>
>> 3) Start a yarn-cluster and execute my program with the following command:
>>
>> $./bin/flink run -m yarn-cluster -yn 1 -ys 4 -yjm 1024 -ytm 1024 -c
>> eu.amidst.flinklink.examples.WordCountExample ../flinklink.jar
>>
>>
>> 4) The output in the log folder is as follows:
>>
>> 13:31:04,945 INFO  org.apache.flink.client.CliFrontend
>>         -
>> --------------------------------------------------------------------------------
>> 13:31:04,947 INFO  org.apache.flink.client.CliFrontend
>>         -  Starting Command Line Client (Version: 0.10.0, Rev:ab2cca4,
>> Date:10.11.2015 @ 13:50:14 UTC)
>> 13:31:04,947 INFO  org.apache.flink.client.CliFrontend
>>         -  Current user: hadoop
>> 13:31:04,947 INFO  org.apache.flink.client.CliFrontend
>>         -  JVM: OpenJDK 64-Bit Server VM - Oracle Corporation -
>> 1.8/25.65-b01
>> 13:31:04,947 INFO  org.apache.flink.client.CliFrontend
>>         -  Maximum heap size: 3344 MiBytes
>> 13:31:04,947 INFO  org.apache.flink.client.CliFrontend
>>         -  JAVA_HOME: /etc/alternatives/jre
>> 13:31:04,950 INFO  org.apache.flink.client.CliFrontend
>>         -  Hadoop version: 2.6.0
>> 13:31:04,950 INFO  org.apache.flink.client.CliFrontend
>>         -  JVM Options:
>> 13:31:04,950 INFO  org.apache.flink.client.CliFrontend
>>         -
>> -Dlog.file=/home/hadoop/flink-0.10.0/log/flink-hadoop-client-ip-172-31-33-221.log
>> 13:31:04,950 INFO  org.apache.flink.client.CliFrontend
>>         -
>> -Dlog4j.configuration=file:/home/hadoop/flink-0.10.0/conf/log4j-cli.properties
>> 13:31:04,950 INFO  org.apache.flink.client.CliFrontend
>>         -
>> -Dlogback.configurationFile=file:/home/hadoop/flink-0.10.0/conf/logback.xml
>> 13:31:04,951 INFO  org.apache.flink.client.CliFrontend
>>         -  Program Arguments:
>> 13:31:04,951 INFO  org.apache.flink.client.CliFrontend
>>         -     run
>> 13:31:04,951 INFO  org.apache.flink.client.CliFrontend
>>         -     -m
>> 13:31:04,951 INFO  org.apache.flink.client.CliFrontend
>>         -     yarn-cluster
>> 13:31:04,951 INFO  org.apache.flink.client.CliFrontend
>>         -     -yn
>> 13:31:04,951 INFO  org.apache.flink.client.CliFrontend
>>         -     1
>> 13:31:04,951 INFO  org.apache.flink.client.CliFrontend
>>         -     -ys
>> 13:31:04,951 INFO  org.apache.flink.client.CliFrontend
>>         -     4
>> 13:31:04,951 INFO  org.apache.flink.client.CliFrontend
>>         -     -yjm
>> 13:31:04,951 INFO  org.apache.flink.client.CliFrontend
>>         -     1024
>> 13:31:04,951 INFO  org.apache.flink.client.CliFrontend
>>         -     -ytm
>> 13:31:04,951 INFO  org.apache.flink.client.CliFrontend
>>         -     1024
>> 13:31:04,952 INFO  org.apache.flink.client.CliFrontend
>>         -     -c
>> 13:31:04,952 INFO  org.apache.flink.client.CliFrontend
>>         -     eu.amidst.flinklink.examples.WordCountExample
>> 13:31:04,952 INFO  org.apache.flink.client.CliFrontend
>>         -     ../flinklink.jar
>> 13:31:04,952 INFO  org.apache.flink.client.CliFrontend
>>         -
>> --------------------------------------------------------------------------------
>> 13:31:04,954 INFO  org.apache.flink.client.CliFrontend
>>         - Using configuration directory /home/hadoop/flink-0.10.0/conf
>> 13:31:04,954 INFO  org.apache.flink.client.CliFrontend
>>         - Trying to load configuration file
>> 13:31:05,193 INFO  org.apache.flink.client.CliFrontend
>>         - Running 'run' command.
>> 13:31:05,201 INFO  org.apache.flink.client.CliFrontend
>>         - Building program from JAR file
>> 13:31:05,326 INFO  org.apache.flink.client.CliFrontend
>>         - YARN cluster mode detected. Switching Log4j output to console
>> 13:31:05,385 INFO  org.apache.hadoop.yarn.client.RMProxy
>>         - Connecting to ResourceManager at ip-172-31-33-221.us
>> -west-2.compute.internal/172.31.33.221:8032
>> 13:31:05,534 INFO  org.apache.flink.client.FlinkYarnSessionCli
>>         - No path for the flink jar passed. Using the location of class
>> org.apache.flink.yarn.FlinkYarnClient to locate the jar
>> 13:31:05,545 INFO  org.apache.flink.yarn.FlinkYarnClient
>>         - Using values:
>> 13:31:05,547 INFO  org.apache.flink.yarn.FlinkYarnClient
>>         - TaskManager count = 1
>> 13:31:05,547 INFO  org.apache.flink.yarn.FlinkYarnClient
>>         - JobManager memory = 1024
>> 13:31:05,547 INFO  org.apache.flink.yarn.FlinkYarnClient
>>         - TaskManager memory = 1024
>> 13:31:06,112 INFO  org.apache.flink.yarn.Utils
>>         - Copying from
>> file:/home/hadoop/flink-0.10.0/lib/flink-dist-0.10.0.jar to
>> hdfs://ip-172-31-33-221.us-west-2.compute.internal:8020/user/hadoop/.flink/application_1452250761414_0005/flink-dist-0.10.0.jar
>> 13:31:06,843 INFO  org.apache.flink.yarn.Utils
>>         - Copying from /home/hadoop/flink-0.10.0/conf/flink-conf.yaml to
>> hdfs://ip-172-31-33-221.us-west-2.compute.internal:8020/user/hadoop/.flink/application_1452250761414_0005/flink-conf.yaml
>> 13:31:06,857 INFO  org.apache.flink.yarn.Utils
>>         - Copying from file:/home/hadoop/flink-0.10.0/conf/logback.xml to
>> hdfs://ip-172-31-33-221.us-west-2.compute.internal:8020/user/hadoop/.flink/application_1452250761414_0005/logback.xml
>> 13:31:06,869 INFO  org.apache.flink.yarn.Utils
>>         - Copying from file:/home/hadoop/flink-0.10.0/conf/log4j.properties
>> to
>> hdfs://ip-172-31-33-221.us-west-2.compute.internal:8020/user/hadoop/.flink/application_1452250761414_0005/log4j.properties
>> 13:31:06,892 INFO  org.apache.flink.yarn.FlinkYarnClient
>>         - Submitting application master application_1452250761414_0005
>> 13:31:06,917 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl
>>         - Submitted application application_1452250761414_0005
>> 13:31:06,917 INFO  org.apache.flink.yarn.FlinkYarnClient
>>         - Waiting for the cluster to be allocated
>> 13:31:06,919 INFO  org.apache.flink.yarn.FlinkYarnClient
>>         - Deploying cluster, current state ACCEPTED
>> 13:31:07,920 INFO  org.apache.flink.yarn.FlinkYarnClient
>>         - Deploying cluster, current state ACCEPTED
>> 13:31:08,922 INFO  org.apache.flink.yarn.FlinkYarnClient
>>         - Deploying cluster, current state ACCEPTED
>> 13:31:09,924 INFO  org.apache.flink.yarn.FlinkYarnClient
>>         - Deploying cluster, current state ACCEPTED
>> 13:31:10,925 INFO  org.apache.flink.yarn.FlinkYarnClient
>>         - YARN application has been deployed successfully.
>> 13:31:10,929 INFO  org.apache.flink.yarn.FlinkYarnCluster
>>         - Start actor system.
>> 13:31:11,412 INFO  akka.event.slf4j.Slf4jLogger
>>         - Slf4jLogger started
>> 13:31:11,472 INFO  Remoting
>>         - Starting remoting
>> 13:31:11,698 INFO  Remoting
>>         - Remoting started; listening on addresses :[
>> akka.tcp://flink@172.31.33.221:39464]
>> 13:31:11,733 INFO  org.apache.flink.yarn.FlinkYarnCluster
>>         - Start application client.
>> 13:31:11,737 INFO  org.apache.flink.client.CliFrontend
>>         - YARN cluster started
>> 13:31:11,737 INFO  org.apache.flink.client.CliFrontend
>>         - JobManager web interface address
>> http://ip-172-31-33-221.us-west-2.compute.internal:20888/proxy/application_1452250761414_0005/
>> 13:31:11,737 INFO  org.apache.flink.client.CliFrontend
>>         - Waiting until all TaskManagers have connected
>> 13:31:11,748 INFO  org.apache.flink.yarn.ApplicationClient
>>         - Notification about new leader address
>> akka.tcp://flink@172.31.45.98:46965/user/jobmanager with session ID null.
>> 13:31:11,752 INFO  org.apache.flink.client.CliFrontend
>>         - No status updates from the YARN cluster received so far. Waiting
>> ...
>> 13:31:11,752 INFO  org.apache.flink.yarn.ApplicationClient
>>         - Received address of new leader
>> akka.tcp://flink@172.31.45.98:46965/user/jobmanager with session ID null.
>> 13:31:11,753 INFO  org.apache.flink.yarn.ApplicationClient
>>         - Disconnect from JobManager null.
>> 13:31:11,757 INFO  org.apache.flink.yarn.ApplicationClient
>>         - Trying to register at JobManager
>> akka.tcp://flink@172.31.45.98:46965/user/jobmanager.
>> 13:31:12,040 INFO  org.apache.flink.yarn.ApplicationClient
>>         - Successfully registered at the JobManager Actor[
>> akka.tcp://flink@172.31.45.98:46965/user/jobmanager#1549383782]
>> 13:31:12,253 INFO  org.apache.flink.client.CliFrontend
>>         - TaskManager status (0/1)
>> 13:31:12,753 INFO  org.apache.flink.client.CliFrontend
>>         - TaskManager status (0/1)
>> 13:31:13,254 INFO  org.apache.flink.client.CliFrontend
>>         - TaskManager status (0/1)
>> 13:31:13,755 INFO  org.apache.flink.client.CliFrontend
>>         - TaskManager status (0/1)
>> 13:31:14,255 INFO  org.apache.flink.client.CliFrontend
>>         - TaskManager status (0/1)
>> 13:31:14,756 INFO  org.apache.flink.client.CliFrontend
>>         - TaskManager status (0/1)
>> 13:31:15,257 INFO  org.apache.flink.client.CliFrontend
>>         - TaskManager status (0/1)
>> 13:31:15,758 INFO  org.apache.flink.client.CliFrontend
>>         - TaskManager status (0/1)
>> 13:31:16,258 INFO  org.apache.flink.client.CliFrontend
>>         - All TaskManagers are connected
>> 13:31:16,264 INFO  org.apache.flink.client.program.Client
>>         - Starting client actor system
>> 13:31:16,265 INFO  org.apache.flink.runtime.client.JobClient
>>         - Starting JobClient actor system
>> 13:31:16,283 INFO  akka.event.slf4j.Slf4jLogger
>>         - Slf4jLogger started
>> 13:31:16,288 INFO  Remoting
>>         - Starting remoting
>> 13:31:16,301 INFO  Remoting
>>         - Remoting started; listening on addresses :[
>> akka.tcp://flink@127.0.0.1:45919]
>> 13:31:16,302 INFO  org.apache.flink.runtime.client.JobClient
>>         - Started JobClient actor system at 127.0.0.1:45919
>> 13:31:16,302 INFO  org.apache.flink.client.CliFrontend
>>         - Using the parallelism provided by the remote cluster (4). To use
>> another parallelism, set it at the ./bin/flink client.
>> 13:31:16,302 INFO  org.apache.flink.client.CliFrontend
>>         - Starting execution of program
>> 13:31:16,303 INFO  org.apache.flink.client.program.Client
>>         - Starting program in interactive mode
>> 13:31:16,313 INFO  eu.amidst.flinklink.examples.WordCountExample
>>         - Entering application.
>> 13:31:16,342 INFO  TestClass.class
>>         - Logger in TestClass
>> 13:31:16,346 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>>         - class eu.amidst.flinklink.examples.WordCountExample$TestClass is
>> not a valid POJO type
>> 13:31:16,376 INFO  org.apache.flink.client.CliFrontend
>>         - Program execution finished
>> 13:31:16,384 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator
>>         - Shutting down remote daemon.
>> 13:31:16,386 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator
>>         - Remote daemon shut down; proceeding with flushing remote
>> transports.
>> 13:31:16,408 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator
>>         - Remoting shut down.
>> 13:31:16,431 INFO  org.apache.flink.client.CliFrontend
>>         - Shutting down YARN cluster
>> 13:31:16,431 INFO  org.apache.flink.yarn.FlinkYarnCluster
>>         - Sending shutdown request to the Application Master
>> 13:31:16,432 INFO  org.apache.flink.yarn.ApplicationClient
>>         - Sending StopYarnSession request to ApplicationMaster.
>> 13:31:16,568 INFO  org.apache.flink.yarn.ApplicationClient
>>         - Remote JobManager has been stopped successfully. Stopping local
>> application client
>> 13:31:16,570 INFO  org.apache.flink.yarn.ApplicationClient
>>         - Stopped Application client.
>> 13:31:16,570 INFO  org.apache.flink.yarn.ApplicationClient
>>         - Disconnect from JobManager Actor[
>> akka.tcp://flink@172.31.45.98:46965/user/jobmanager#1549383782].
>> 13:31:16,573 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator
>>         - Shutting down remote daemon.
>> 13:31:16,573 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator
>>         - Remote daemon shut down; proceeding with flushing remote
>> transports.
>> 13:31:16,584 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator
>>         - Remoting shut down.
>> 13:31:16,595 INFO  org.apache.flink.yarn.FlinkYarnCluster
>>         - Deleting files in
>> hdfs://ip-172-31-33-221.us-west-2.compute.internal:8020/user/hadoop/.flink/application_1452250761414_0005
>> 13:31:16,596 INFO  org.apache.flink.yarn.FlinkYarnCluster
>>         - Application application_1452250761414_0005 finished with state
>> FINISHED and final state SUCCEEDED at 1452259876445
>> 13:31:16,747 INFO  org.apache.flink.yarn.FlinkYarnCluster
>>         - YARN Client is shutting down
>>
>>
>> You can see the log messages from the WordCountExample and TestClass
>> classes. But I have problems to show the logger message (INFO) in the
>> LineSplitter class. Presumably, because it is executed in the CORE nodes
>> and node in the MASTER node (it all runs well in my local computer).
>>
>> Any tips?
>> Ana
>>
>>
>> On 06 Jan 2016, at 15:58, Ana M. Martinez <an...@cs.aau.dk> wrote:
>>
>> Hi Till,
>>
>> I am afraid it does not work in any case.
>>
>> I am following the steps you indicate on your websites (for yarn
>> configuration and loggers with slf4j):
>>
>> 1) Enable log aggregation in yarn-site:
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-0.7/yarn_setup.html#log-files
>>
>> 2) Include Loggers as indicated here (see WordCountExample below):
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-0.7/internal_logging.html
>>
>> But I cannot get the log messages that run in the map functions. Am I
>> missing something?
>>
>> Thanks,
>> Ana
>>
>> On 04 Jan 2016, at 14:00, Till Rohrmann <tr...@apache.org> wrote:
>>
>> I think the YARN application has to be finished in order for the logs to
>> be accessible.
>>
>> Judging from you commands, you’re starting a long running YARN
>> application running Flink with ./bin/yarn-session.sh -n 1 -tm 2048 -s 4.
>> This cluster won’t be used though, because you’re executing your job with ./bin/flink
>> run -m yarn-cluster which will start another YARN application which is
>> only alive as long as the Flink job is executed. If you want to run your
>> job on the long running YARN application, then you simply have to omit -m
>> yarn-cluster.
>>
>> Cheers,
>> Till
>> ​
>>
>> On Mon, Jan 4, 2016 at 12:36 PM, Ana M. Martinez <an...@cs.aau.dk> wrote:
>>
>>> Hi Till,
>>>
>>> Sorry for the delay (Xmas break). I have activated log aggregation
>>> on flink-conf.yaml with yarn.log-aggregation-enable: true (as I can’t find
>>> a yarn-site.xml).
>>> But the command yarn logs -applicationId application_1451903796996_0008
>>> gives me the following output:
>>>
>>> INFO client.RMProxy: Connecting to ResourceManager at xxx
>>> /var/log/hadoop-yarn/apps/hadoop/logs/application_1451903796996_0008does
>>> not exist.
>>> Log aggregation has not completed or is not enabled
>>>
>>>
>>> I’ve tried to restart the Flink JobManager and TaskManagers as follows:
>>> ./bin/yarn-session.sh -n 1 -tm 2048 -s 4
>>> and then with a detached screen, run my application with ./bin/flink run
>>> -m yarn-cluster ...
>>>
>>> I am not sure if my problem is that I am not setting the
>>> log-aggregation-enable property well or I am not restarting the Flink
>>> JobManager and TaskManagers as I should… Any idea?
>>>
>>> Thanks,
>>> Ana
>>>
>>> On 18 Dec 2015, at 16:29, Till Rohrmann <tr...@apache.org> wrote:
>>>
>>> In which log file are you exactly looking for the logging statements?
>>> And on what machine? You have to look on the machines on which the yarn
>>> container were started. Alternatively if you have log aggregation
>>> activated, then you can simply retrieve the log files via yarn logs.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Fri, Dec 18, 2015 at 3:49 PM, Ana M. Martinez <an...@cs.aau.dk> wrote:
>>>
>>>> Hi Till,
>>>>
>>>> Many thanks for your quick response.
>>>>
>>>> I have modified the WordCountExample to re-reproduce my problem in a
>>>> simple example.
>>>>
>>>> I run the code below with the following command:
>>>> ./bin/flink run -m yarn-cluster -yn 1 -ys 4 -yjm 1024 -ytm 1024 -c
>>>> mypackage.WordCountExample ../flinklink.jar
>>>>
>>>> And if I check the log file I see all logger messages except the one in
>>>> the flatMap function of the inner LineSplitter class, which is actually the
>>>> one I am most interested in.
>>>>
>>>> Is that an expected behaviour?
>>>>
>>>> Thanks,
>>>> Ana
>>>>
>>>> import org.apache.flink.api.common.functions.FlatMapFunction;
>>>> import org.apache.flink.api.java.DataSet;
>>>> import org.apache.flink.api.java.ExecutionEnvironment;
>>>> import org.apache.flink.api.java.tuple.Tuple2;
>>>> import org.apache.flink.util.Collector;
>>>> import org.slf4j.Logger;
>>>> import org.slf4j.LoggerFactory;
>>>>
>>>> import java.io.Serializable;
>>>> import java.util.ArrayList;
>>>> import java.util.List;
>>>>
>>>> public class WordCountExample {
>>>>     static Logger logger = LoggerFactory.getLogger(WordCountExample.class);
>>>>
>>>>     public static void main(String[] args) throws Exception {
>>>>         final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
>>>>
>>>>         logger.info("Entering application.");
>>>>
>>>>     DataSet<String> text = env.fromElements(
>>>>                 "Who's there?",
>>>>                 "I think I hear them. Stand, ho! Who's there?");
>>>>
>>>>         List<Integer> elements = new ArrayList<Integer>();
>>>>         elements.add(0);
>>>>
>>>>
>>>>         DataSet<TestClass> set = env.fromElements(new TestClass(elements));
>>>>
>>>>         DataSet<Tuple2<String, Integer>> wordCounts = text
>>>>                 .flatMap(new LineSplitter())
>>>>                 .withBroadcastSet(set, "set")
>>>>                 .groupBy(0)
>>>>                 .sum(1);
>>>>
>>>>         wordCounts.print();
>>>>
>>>>
>>>>     }
>>>>
>>>>     public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
>>>>
>>>>         static Logger loggerLineSplitter = LoggerFactory.getLogger(LineSplitter.class);
>>>>
>>>>         @Override
>>>>         public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
>>>>             loggerLineSplitter.info("Logger in LineSplitter.flatMap");
>>>>             for (String word : line.split(" ")) {
>>>>                 out.collect(new Tuple2<String, Integer>(word, 1));
>>>>             }
>>>>         }
>>>>     }
>>>>
>>>>     public static class TestClass implements Serializable {
>>>>         private static final long serialVersionUID = -2932037991574118651L;
>>>>
>>>>         static Logger loggerTestClass = LoggerFactory.getLogger("WordCountExample.TestClass");
>>>>
>>>>         List<Integer> integerList;
>>>>         public TestClass(List<Integer> integerList){
>>>>             this.integerList=integerList;
>>>>             loggerTestClass.info("Logger in TestClass");
>>>>         }
>>>>
>>>>
>>>>     }
>>>> }
>>>>
>>>>
>>>>
>>>>
>>>> On 17 Dec 2015, at 16:08, Till Rohrmann <tr...@apache.org> wrote:
>>>>
>>>> Hi Ana,
>>>>
>>>> you can simply modify the `log4j.properties` file in the `conf`
>>>> directory. It should be automatically included in the Yarn application.
>>>>
>>>> Concerning your logging problem, it might be that you have set the
>>>> logging level too high. Could you share the code with us?
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Thu, Dec 17, 2015 at 1:56 PM, Ana M. Martinez <an...@cs.aau.dk> wrote:
>>>>
>>>>> Hi flink community,
>>>>>
>>>>> I am trying to show log messages using log4j.
>>>>> It works fine overall except for the messages I want to show in an
>>>>> inner class that implements
>>>>> org.apache.flink.api.common.aggregators.ConvergenceCriterion.
>>>>> I am very new to this, but it seems that I’m having problems to show
>>>>> the messages included in the isConverged function, as it runs in the task
>>>>> managers?
>>>>> E.g. the log messages in the outer class (before map-reduce
>>>>> operations) are properly shown.
>>>>>
>>>>> I am also interested in providing my own log4j.properties file. I am
>>>>> using the ./bin/flink run -m yarn-cluster on Amazon clusters.
>>>>>
>>>>> Thanks,
>>>>> Ana
>>>>
>>>>
>>>>
>>>>
>>>
>>>
>>
>>
>>
>
>

Re: Problem to show logs in task managers

Posted by "Ana M. Martinez" <an...@cs.aau.dk>.
Hi Till,

Thanks for that! I can see the "Logger in LineSplitter.flatMap” output if I retrieve the task manager logs manually (under /var/log/hadoop-yarn/containers/application_X/…). However that solution is not ideal when for instance I am using 32 machines for my mapReduce operations.

I would like to know why Yarn’s log aggregation is not working. Can you tell me how to check if there are some Yarn containers running after the Flink job has finished? I have tried:
hadoop job -list
but I cannot see any jobs there, although I am not sure that it means that there are not containers running...

Thanks,
Ana

On 08 Jan 2016, at 16:24, Till Rohrmann <tr...@apache.org>> wrote:


You’re right that the log statements of the LineSplitter are in the logs of the cluster nodes, because that’s where the LineSplitter code is executed. In contrast, you create a TestClass on the client when you submit the program. Therefore, you see the logging statement “Logger in TestClass” on the command line or in the cli log file.

So I would assume that the problem is Yarn’s log aggregation. Either your configuration is not correct or there are still some Yarn containers running after the Flink job has finished. Yarn will only show you the logs after all containers are terminated. Maybe you could check that. Alternatively, you can try to retrieve the taskmanager logs manually by going to the machine where your yarn container was executed. Then under hadoop/logs/userlogs you should find somewhere the logs.

Cheers,
Till

​

On Fri, Jan 8, 2016 at 3:50 PM, Ana M. Martinez <an...@cs.aau.dk>> wrote:
Thanks for the tip Robert! It was a good idea to rule out other possible causes, but I am afraid that is not the problem. If we stick to the WordCountExample (for simplicity), the Exception is thrown if placed into the flatMap function.

I am going to try to re-write my problem and all the settings below:

When I try to aggregate all logs:
 $yarn logs -applicationId application_1452250761414_0005

the following message is retrieved:
16/01/08 13:32:37 INFO client.RMProxy: Connecting to ResourceManager at ip-172-31-33-221.us<http://ip-172-31-33-221.us/>-west-2.compute.internal/172.31.33.221:8032<http://172.31.33.221:8032/>
/var/log/hadoop-yarn/apps/hadoop/logs/application_1452250761414_0005does not exist.
Log aggregation has not completed or is not enabled.

(Tried the same command a few minutes later and got the same message, so might it be that log aggregation is not properly enabled??)

I am going to carefully enumerate all the steps I have followed (and settings) to see if someone can identify why the Logger messages from CORE nodes (in an Amazon cluster) are not shown.

1) Enable yarn.log-aggregation-enable property to true in /etc/alternatives/hadoop-conf/yarn-site.xml.

2) Include log messages in my WordCountExample as follows:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;


public class WordCountExample {
    static Logger logger = LoggerFactory.getLogger(WordCountExample.class);

    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        logger.info("Entering application.");

        DataSet<String> text = env.fromElements(
                "Who's there?",
                "I think I hear them. Stand, ho! Who's there?");

        List<Integer> elements = new ArrayList<Integer>();
        elements.add(0);


        DataSet<TestClass> set = env.fromElements(new TestClass(elements));

        DataSet<Tuple2<String, Integer>> wordCounts = text
                .flatMap(new LineSplitter())
                .withBroadcastSet(set, "set")
                .groupBy(0)
                .sum(1);

        wordCounts.writeAsText(“output.txt", FileSystem.WriteMode.OVERWRITE);


    }

    public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {

        static Logger loggerLineSplitter = LoggerFactory.getLogger(LineSplitter.class);

        @Override
        public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
            loggerLineSplitter.info("Logger in LineSplitter.flatMap");

                for (String word : line.split(" ")) {
                        out.collect(new Tuple2<String, Integer>(word, 1));
                        //throw new RuntimeException("LineSplitter class called");
                }

        }
    }

    public static class TestClass implements Serializable {
        private static final long serialVersionUID = -2932037991574118651L;

        static Logger loggerTestClass = LoggerFactory.getLogger("TestClass.class");

        List<Integer> integerList;
        public TestClass(List<Integer> integerList){
            this.integerList=integerList;
            loggerTestClass.info("Logger in TestClass");
        }


    }
}

3) Start a yarn-cluster and execute my program with the following command:

$./bin/flink run -m yarn-cluster -yn 1 -ys 4 -yjm 1024 -ytm 1024 -c eu.amidst.flinklink.examples.WordCountExample ../flinklink.jar


4) The output in the log folder is as follows:

13:31:04,945 INFO  org.apache.flink.client.CliFrontend                           - --------------------------------------------------------------------------------
13:31:04,947 INFO  org.apache.flink.client.CliFrontend                           -  Starting Command Line Client (Version: 0.10.0, Rev:ab2cca4, Date:10.11.2015 @ 13:50:14 UTC)
13:31:04,947 INFO  org.apache.flink.client.CliFrontend                           -  Current user: hadoop
13:31:04,947 INFO  org.apache.flink.client.CliFrontend                           -  JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.65-b01
13:31:04,947 INFO  org.apache.flink.client.CliFrontend                           -  Maximum heap size: 3344 MiBytes
13:31:04,947 INFO  org.apache.flink.client.CliFrontend                           -  JAVA_HOME: /etc/alternatives/jre
13:31:04,950 INFO  org.apache.flink.client.CliFrontend                           -  Hadoop version: 2.6.0
13:31:04,950 INFO  org.apache.flink.client.CliFrontend                           -  JVM Options:
13:31:04,950 INFO  org.apache.flink.client.CliFrontend                           -     -Dlog.file=/home/hadoop/flink-0.10.0/log/flink-hadoop-client-ip-172-31-33-221.log
13:31:04,950 INFO  org.apache.flink.client.CliFrontend                           -     -Dlog4j.configuration=file:/home/hadoop/flink-0.10.0/conf/log4j-cli.properties
13:31:04,950 INFO  org.apache.flink.client.CliFrontend                           -     -Dlogback.configurationFile=file:/home/hadoop/flink-0.10.0/conf/logback.xml
13:31:04,951 INFO  org.apache.flink.client.CliFrontend                           -  Program Arguments:
13:31:04,951 INFO  org.apache.flink.client.CliFrontend                           -     run
13:31:04,951 INFO  org.apache.flink.client.CliFrontend                           -     -m
13:31:04,951 INFO  org.apache.flink.client.CliFrontend                           -     yarn-cluster
13:31:04,951 INFO  org.apache.flink.client.CliFrontend                           -     -yn
13:31:04,951 INFO  org.apache.flink.client.CliFrontend                           -     1
13:31:04,951 INFO  org.apache.flink.client.CliFrontend                           -     -ys
13:31:04,951 INFO  org.apache.flink.client.CliFrontend                           -     4
13:31:04,951 INFO  org.apache.flink.client.CliFrontend                           -     -yjm
13:31:04,951 INFO  org.apache.flink.client.CliFrontend                           -     1024
13:31:04,951 INFO  org.apache.flink.client.CliFrontend                           -     -ytm
13:31:04,951 INFO  org.apache.flink.client.CliFrontend                           -     1024
13:31:04,952 INFO  org.apache.flink.client.CliFrontend                           -     -c
13:31:04,952 INFO  org.apache.flink.client.CliFrontend                           -     eu.amidst.flinklink.examples.WordCountExample
13:31:04,952 INFO  org.apache.flink.client.CliFrontend                           -     ../flinklink.jar
13:31:04,952 INFO  org.apache.flink.client.CliFrontend                           - --------------------------------------------------------------------------------
13:31:04,954 INFO  org.apache.flink.client.CliFrontend                           - Using configuration directory /home/hadoop/flink-0.10.0/conf
13:31:04,954 INFO  org.apache.flink.client.CliFrontend                           - Trying to load configuration file
13:31:05,193 INFO  org.apache.flink.client.CliFrontend                           - Running 'run' command.
13:31:05,201 INFO  org.apache.flink.client.CliFrontend                           - Building program from JAR file
13:31:05,326 INFO  org.apache.flink.client.CliFrontend                           - YARN cluster mode detected. Switching Log4j output to console
13:31:05,385 INFO  org.apache.hadoop.yarn.client.RMProxy                         - Connecting to ResourceManager at ip-172-31-33-221.us<http://ip-172-31-33-221.us/>-west-2.compute.internal/172.31.33.221:8032<http://172.31.33.221:8032/>
13:31:05,534 INFO  org.apache.flink.client.FlinkYarnSessionCli                   - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.FlinkYarnClient to locate the jar
13:31:05,545 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Using values:
13:31:05,547 INFO  org.apache.flink.yarn.FlinkYarnClient                         - TaskManager count = 1
13:31:05,547 INFO  org.apache.flink.yarn.FlinkYarnClient                         - JobManager memory = 1024
13:31:05,547 INFO  org.apache.flink.yarn.FlinkYarnClient                         - TaskManager memory = 1024
13:31:06,112 INFO  org.apache.flink.yarn.Utils                                   - Copying from file:/home/hadoop/flink-0.10.0/lib/flink-dist-0.10.0.jar to hdfs://ip-172-31-33-221.us-west-2.compute.internal:8020/user/hadoop/.flink/application_1452250761414_0005/flink-dist-0.10.0.jar
13:31:06,843 INFO  org.apache.flink.yarn.Utils                                   - Copying from /home/hadoop/flink-0.10.0/conf/flink-conf.yaml to hdfs://ip-172-31-33-221.us-west-2.compute.internal:8020/user/hadoop/.flink/application_1452250761414_0005/flink-conf.yaml
13:31:06,857 INFO  org.apache.flink.yarn.Utils                                   - Copying from file:/home/hadoop/flink-0.10.0/conf/logback.xml to hdfs://ip-172-31-33-221.us-west-2.compute.internal:8020/user/hadoop/.flink/application_1452250761414_0005/logback.xml
13:31:06,869 INFO  org.apache.flink.yarn.Utils                                   - Copying from file:/home/hadoop/flink-0.10.0/conf/log4j.properties to hdfs://ip-172-31-33-221.us-west-2.compute.internal:8020/user/hadoop/.flink/application_1452250761414_0005/log4j.properties
13:31:06,892 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Submitting application master application_1452250761414_0005
13:31:06,917 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl         - Submitted application application_1452250761414_0005
13:31:06,917 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Waiting for the cluster to be allocated
13:31:06,919 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
13:31:07,920 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
13:31:08,922 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
13:31:09,924 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
13:31:10,925 INFO  org.apache.flink.yarn.FlinkYarnClient                         - YARN application has been deployed successfully.
13:31:10,929 INFO  org.apache.flink.yarn.FlinkYarnCluster                        - Start actor system.
13:31:11,412 INFO  akka.event.slf4j.Slf4jLogger                                  - Slf4jLogger started
13:31:11,472 INFO  Remoting                                                      - Starting remoting
13:31:11,698 INFO  Remoting                                                      - Remoting started; listening on addresses :[akka.tcp://flink@172.31.33.221:39464]
13:31:11,733 INFO  org.apache.flink.yarn.FlinkYarnCluster                        - Start application client.
13:31:11,737 INFO  org.apache.flink.client.CliFrontend                           - YARN cluster started
13:31:11,737 INFO  org.apache.flink.client.CliFrontend                           - JobManager web interface address http://ip-172-31-33-221.us-west-2.compute.internal:20888/proxy/application_1452250761414_0005/
13:31:11,737 INFO  org.apache.flink.client.CliFrontend                           - Waiting until all TaskManagers have connected
13:31:11,748 INFO  org.apache.flink.yarn.ApplicationClient                       - Notification about new leader address akka.tcp://flink@172.31.45.98:46965/user/jobmanager with session ID null.
13:31:11,752 INFO  org.apache.flink.client.CliFrontend                           - No status updates from the YARN cluster received so far. Waiting ...
13:31:11,752 INFO  org.apache.flink.yarn.ApplicationClient                       - Received address of new leader akka.tcp://flink@172.31.45.98:46965/user/jobmanager with session ID null.
13:31:11,753 INFO  org.apache.flink.yarn.ApplicationClient                       - Disconnect from JobManager null.
13:31:11,757 INFO  org.apache.flink.yarn.ApplicationClient                       - Trying to register at JobManager akka.tcp://flink@172.31.45.98:46965/user/jobmanager.
13:31:12,040 INFO  org.apache.flink.yarn.ApplicationClient                       - Successfully registered at the JobManager Actor[akka.tcp://flink@172.31.45.98:46965/user/jobmanager#1549383782]
13:31:12,253 INFO  org.apache.flink.client.CliFrontend                           - TaskManager status (0/1)
13:31:12,753 INFO  org.apache.flink.client.CliFrontend                           - TaskManager status (0/1)
13:31:13,254 INFO  org.apache.flink.client.CliFrontend                           - TaskManager status (0/1)
13:31:13,755 INFO  org.apache.flink.client.CliFrontend                           - TaskManager status (0/1)
13:31:14,255 INFO  org.apache.flink.client.CliFrontend                           - TaskManager status (0/1)
13:31:14,756 INFO  org.apache.flink.client.CliFrontend                           - TaskManager status (0/1)
13:31:15,257 INFO  org.apache.flink.client.CliFrontend                           - TaskManager status (0/1)
13:31:15,758 INFO  org.apache.flink.client.CliFrontend                           - TaskManager status (0/1)
13:31:16,258 INFO  org.apache.flink.client.CliFrontend                           - All TaskManagers are connected
13:31:16,264 INFO  org.apache.flink.client.program.Client                        - Starting client actor system
13:31:16,265 INFO  org.apache.flink.runtime.client.JobClient                     - Starting JobClient actor system
13:31:16,283 INFO  akka.event.slf4j.Slf4jLogger                                  - Slf4jLogger started
13:31:16,288 INFO  Remoting                                                      - Starting remoting
13:31:16,301 INFO  Remoting                                                      - Remoting started; listening on addresses :[akka.tcp://flink@127.0.0.1:45919]
13:31:16,302 INFO  org.apache.flink.runtime.client.JobClient                     - Started JobClient actor system at 127.0.0.1:45919<http://127.0.0.1:45919/>
13:31:16,302 INFO  org.apache.flink.client.CliFrontend                           - Using the parallelism provided by the remote cluster (4). To use another parallelism, set it at the ./bin/flink client.
13:31:16,302 INFO  org.apache.flink.client.CliFrontend                           - Starting execution of program
13:31:16,303 INFO  org.apache.flink.client.program.Client                        - Starting program in interactive mode
13:31:16,313 INFO  eu.amidst.flinklink.examples.WordCountExample                 - Entering application.
13:31:16,342 INFO  TestClass.class                                               - Logger in TestClass
13:31:16,346 INFO  org.apache.flink.api.java.typeutils.TypeExtractor             - class eu.amidst.flinklink.examples.WordCountExample$TestClass is not a valid POJO type
13:31:16,376 INFO  org.apache.flink.client.CliFrontend                           - Program execution finished
13:31:16,384 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Shutting down remote daemon.
13:31:16,386 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remote daemon shut down; proceeding with flushing remote transports.
13:31:16,408 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remoting shut down.
13:31:16,431 INFO  org.apache.flink.client.CliFrontend                           - Shutting down YARN cluster
13:31:16,431 INFO  org.apache.flink.yarn.FlinkYarnCluster                        - Sending shutdown request to the Application Master
13:31:16,432 INFO  org.apache.flink.yarn.ApplicationClient                       - Sending StopYarnSession request to ApplicationMaster.
13:31:16,568 INFO  org.apache.flink.yarn.ApplicationClient                       - Remote JobManager has been stopped successfully. Stopping local application client
13:31:16,570 INFO  org.apache.flink.yarn.ApplicationClient                       - Stopped Application client.
13:31:16,570 INFO  org.apache.flink.yarn.ApplicationClient                       - Disconnect from JobManager Actor[akka.tcp://flink@172.31.45.98:46965/user/jobmanager#1549383782].
13:31:16,573 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Shutting down remote daemon.
13:31:16,573 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remote daemon shut down; proceeding with flushing remote transports.
13:31:16,584 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remoting shut down.
13:31:16,595 INFO  org.apache.flink.yarn.FlinkYarnCluster                        - Deleting files in hdfs://ip-172-31-33-221.us-west-2.compute.internal:8020/user/hadoop/.flink/application_1452250761414_0005
13:31:16,596 INFO  org.apache.flink.yarn.FlinkYarnCluster                        - Application application_1452250761414_0005 finished with state FINISHED and final state SUCCEEDED at 1452259876445
13:31:16,747 INFO  org.apache.flink.yarn.FlinkYarnCluster                        - YARN Client is shutting down


You can see the log messages from the WordCountExample and TestClass classes. But I have problems to show the logger message (INFO) in the LineSplitter class. Presumably, because it is executed in the CORE nodes and node in the MASTER node (it all runs well in my local computer).

Any tips?
Ana


On 06 Jan 2016, at 15:58, Ana M. Martinez <an...@cs.aau.dk>> wrote:

Hi Till,

I am afraid it does not work in any case.

I am following the steps you indicate on your websites (for yarn configuration and loggers with slf4j):

1) Enable log aggregation in yarn-site:
https://ci.apache.org/projects/flink/flink-docs-release-0.7/yarn_setup.html#log-files

2) Include Loggers as indicated here (see WordCountExample below):
https://ci.apache.org/projects/flink/flink-docs-release-0.7/internal_logging.html

But I cannot get the log messages that run in the map functions. Am I missing something?

Thanks,
Ana

On 04 Jan 2016, at 14:00, Till Rohrmann <tr...@apache.org>> wrote:


I think the YARN application has to be finished in order for the logs to be accessible.

Judging from you commands, you’re starting a long running YARN application running Flink with ./bin/yarn-session.sh -n 1 -tm 2048 -s 4. This cluster won’t be used though, because you’re executing your job with ./bin/flink run -m yarn-cluster which will start another YARN application which is only alive as long as the Flink job is executed. If you want to run your job on the long running YARN application, then you simply have to omit -m yarn-cluster.

Cheers,
Till

​

On Mon, Jan 4, 2016 at 12:36 PM, Ana M. Martinez <an...@cs.aau.dk>> wrote:
Hi Till,

Sorry for the delay (Xmas break). I have activated log aggregation on flink-conf.yaml with yarn.log-aggregation-enable: true (as I can’t find a yarn-site.xml).
But the command yarn logs -applicationId application_1451903796996_0008 gives me the following output:

INFO client.RMProxy: Connecting to ResourceManager at xxx
/var/log/hadoop-yarn/apps/hadoop/logs/application_1451903796996_0008does not exist.
Log aggregation has not completed or is not enabled

I’ve tried to restart the Flink JobManager and TaskManagers as follows:
./bin/yarn-session.sh -n 1 -tm 2048 -s 4
and then with a detached screen, run my application with ./bin/flink run -m yarn-cluster ...

I am not sure if my problem is that I am not setting the log-aggregation-enable property well or I am not restarting the Flink JobManager and TaskManagers as I should… Any idea?

Thanks,
Ana

On 18 Dec 2015, at 16:29, Till Rohrmann <tr...@apache.org>> wrote:

In which log file are you exactly looking for the logging statements? And on what machine? You have to look on the machines on which the yarn container were started. Alternatively if you have log aggregation activated, then you can simply retrieve the log files via yarn logs.

Cheers,
Till

On Fri, Dec 18, 2015 at 3:49 PM, Ana M. Martinez <an...@cs.aau.dk>> wrote:
Hi Till,

Many thanks for your quick response.

I have modified the WordCountExample to re-reproduce my problem in a simple example.

I run the code below with the following command:
./bin/flink run -m yarn-cluster -yn 1 -ys 4 -yjm 1024 -ytm 1024 -c mypackage.WordCountExample ../flinklink.jar

And if I check the log file I see all logger messages except the one in the flatMap function of the inner LineSplitter class, which is actually the one I am most interested in.

Is that an expected behaviour?

Thanks,
Ana


import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

public class WordCountExample {
    static Logger logger = LoggerFactory.getLogger(WordCountExample.class);

    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        logger.info("Entering application.");

    DataSet<String> text = env.fromElements(
                "Who's there?",
                "I think I hear them. Stand, ho! Who's there?");

        List<Integer> elements = new ArrayList<Integer>();
        elements.add(0);


        DataSet<TestClass> set = env.fromElements(new TestClass(elements));

        DataSet<Tuple2<String, Integer>> wordCounts = text
                .flatMap(new LineSplitter())
                .withBroadcastSet(set, "set")
                .groupBy(0)
                .sum(1);

        wordCounts.print();


    }

    public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {

        static Logger loggerLineSplitter = LoggerFactory.getLogger(LineSplitter.class);

        @Override
        public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
            loggerLineSplitter.info("Logger in LineSplitter.flatMap");
            for (String word : line.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }

    public static class TestClass implements Serializable {
        private static final long serialVersionUID = -2932037991574118651L;

        static Logger loggerTestClass = LoggerFactory.getLogger("WordCountExample.TestClass");

        List<Integer> integerList;
        public TestClass(List<Integer> integerList){
            this.integerList=integerList;
            loggerTestClass.info("Logger in TestClass");
        }


    }
}



On 17 Dec 2015, at 16:08, Till Rohrmann <tr...@apache.org>> wrote:

Hi Ana,

you can simply modify the `log4j.properties` file in the `conf` directory. It should be automatically included in the Yarn application.

Concerning your logging problem, it might be that you have set the logging level too high. Could you share the code with us?

Cheers,
Till

On Thu, Dec 17, 2015 at 1:56 PM, Ana M. Martinez <an...@cs.aau.dk>> wrote:
Hi flink community,

I am trying to show log messages using log4j.
It works fine overall except for the messages I want to show in an inner class that implements org.apache.flink.api.common.aggregators.ConvergenceCriterion.
I am very new to this, but it seems that I’m having problems to show the messages included in the isConverged function, as it runs in the task managers?
E.g. the log messages in the outer class (before map-reduce operations) are properly shown.

I am also interested in providing my own log4j.properties file. I am using the ./bin/flink run -m yarn-cluster on Amazon clusters.

Thanks,
Ana










Re: Problem to show logs in task managers

Posted by Till Rohrmann <tr...@apache.org>.
You’re right that the log statements of the LineSplitter are in the logs of
the cluster nodes, because that’s where the LineSplitter code is executed.
In contrast, you create a TestClass on the client when you submit the
program. Therefore, you see the logging statement “Logger in TestClass” on
the command line or in the cli log file.

So I would assume that the problem is Yarn’s log aggregation. Either your
configuration is not correct or there are still some Yarn containers
running after the Flink job has finished. Yarn will only show you the logs
after all containers are terminated. Maybe you could check that.
Alternatively, you can try to retrieve the taskmanager logs manually by
going to the machine where your yarn container was executed. Then under
hadoop/logs/userlogs you should find somewhere the logs.

Cheers,
Till
​

On Fri, Jan 8, 2016 at 3:50 PM, Ana M. Martinez <an...@cs.aau.dk> wrote:

> Thanks for the tip Robert! It was a good idea to rule out other possible
> causes, but I am afraid that is not the problem. If we stick to the
> WordCountExample (for simplicity), the Exception is thrown if placed into
> the flatMap function.
>
> I am going to try to re-write my problem and all the settings below:
>
> When I try to aggregate all logs:
>  $yarn logs -applicationId application_1452250761414_0005
>
> the following message is retrieved:
> 16/01/08 13:32:37 INFO client.RMProxy: Connecting to ResourceManager at
> ip-172-31-33-221.us-west-2.compute.internal/172.31.33.221:8032
> /var/log/hadoop-yarn/apps/hadoop/logs/application_1452250761414_0005does
> not exist.
> Log aggregation has not completed or is not enabled.
>
> (Tried the same command a few minutes later and got the same message, so
> might it be that log aggregation is not properly enabled??)
>
> I am going to carefully enumerate all the steps I have followed (and
> settings) to see if someone can identify why the Logger messages from CORE
> nodes (in an Amazon cluster) are not shown.
>
> 1) Enable yarn.log-aggregation-enable property to true
> in /etc/alternatives/hadoop-conf/yarn-site.xml.
>
> 2) Include log messages in my WordCountExample as follows:
>
> import org.apache.flink.api.common.functions.FlatMapFunction;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.core.fs.FileSystem;
> import org.apache.flink.util.Collector;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> import java.io.Serializable;
> import java.util.ArrayList;
> import java.util.List;
>
>
> public class WordCountExample {
>     static Logger logger = LoggerFactory.getLogger(WordCountExample.class);
>
>     public static void main(String[] args) throws Exception {
>         final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
>
>         logger.info("Entering application.");
>
>         DataSet<String> text = env.fromElements(
>                 "Who's there?",
>                 "I think I hear them. Stand, ho! Who's there?");
>
>         List<Integer> elements = new ArrayList<Integer>();
>         elements.add(0);
>
>
>         DataSet<TestClass> set = env.fromElements(new TestClass(elements));
>
>         DataSet<Tuple2<String, Integer>> wordCounts = text
>                 .flatMap(new LineSplitter())
>                 .withBroadcastSet(set, "set")
>                 .groupBy(0)
>                 .sum(1);
>
>         wordCounts.writeAsText(*“*output.txt", FileSystem.WriteMode.OVERWRITE);
>
>
>     }
>
>     public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
>
>         static Logger loggerLineSplitter = LoggerFactory.getLogger(LineSplitter.class);
>
>         @Override
>         public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
>             loggerLineSplitter.info("Logger in LineSplitter.flatMap");
>
> 		for (String word : line.split(" ")) {
>     			out.collect(new Tuple2<String, Integer>(word, 1));
>     			//throw new RuntimeException("LineSplitter class called");
> 		}
>
>         }
>     }
>
>     public static class TestClass implements Serializable {
>         private static final long serialVersionUID = -2932037991574118651L;
>
>         static Logger loggerTestClass = LoggerFactory.getLogger("TestClass.class");
>
>         List<Integer> integerList;
>         public TestClass(List<Integer> integerList){
>             this.integerList=integerList;
>             loggerTestClass.info("Logger in TestClass");
>         }
>
>
>     }
> }
>
> 3) Start a yarn-cluster and execute my program with the following command:
>
> $./bin/flink run -m yarn-cluster -yn 1 -ys 4 -yjm 1024 -ytm 1024 -c
> eu.amidst.flinklink.examples.WordCountExample ../flinklink.jar
>
>
> 4) The output in the log folder is as follows:
>
> 13:31:04,945 INFO  org.apache.flink.client.CliFrontend
>       -
> --------------------------------------------------------------------------------
> 13:31:04,947 INFO  org.apache.flink.client.CliFrontend
>       -  Starting Command Line Client (Version: 0.10.0, Rev:ab2cca4,
> Date:10.11.2015 @ 13:50:14 UTC)
> 13:31:04,947 INFO  org.apache.flink.client.CliFrontend
>       -  Current user: hadoop
> 13:31:04,947 INFO  org.apache.flink.client.CliFrontend
>       -  JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.65-b01
> 13:31:04,947 INFO  org.apache.flink.client.CliFrontend
>       -  Maximum heap size: 3344 MiBytes
> 13:31:04,947 INFO  org.apache.flink.client.CliFrontend
>       -  JAVA_HOME: /etc/alternatives/jre
> 13:31:04,950 INFO  org.apache.flink.client.CliFrontend
>       -  Hadoop version: 2.6.0
> 13:31:04,950 INFO  org.apache.flink.client.CliFrontend
>       -  JVM Options:
> 13:31:04,950 INFO  org.apache.flink.client.CliFrontend
>       -
> -Dlog.file=/home/hadoop/flink-0.10.0/log/flink-hadoop-client-ip-172-31-33-221.log
> 13:31:04,950 INFO  org.apache.flink.client.CliFrontend
>       -
> -Dlog4j.configuration=file:/home/hadoop/flink-0.10.0/conf/log4j-cli.properties
> 13:31:04,950 INFO  org.apache.flink.client.CliFrontend
>       -
> -Dlogback.configurationFile=file:/home/hadoop/flink-0.10.0/conf/logback.xml
> 13:31:04,951 INFO  org.apache.flink.client.CliFrontend
>       -  Program Arguments:
> 13:31:04,951 INFO  org.apache.flink.client.CliFrontend
>       -     run
> 13:31:04,951 INFO  org.apache.flink.client.CliFrontend
>       -     -m
> 13:31:04,951 INFO  org.apache.flink.client.CliFrontend
>       -     yarn-cluster
> 13:31:04,951 INFO  org.apache.flink.client.CliFrontend
>       -     -yn
> 13:31:04,951 INFO  org.apache.flink.client.CliFrontend
>       -     1
> 13:31:04,951 INFO  org.apache.flink.client.CliFrontend
>       -     -ys
> 13:31:04,951 INFO  org.apache.flink.client.CliFrontend
>       -     4
> 13:31:04,951 INFO  org.apache.flink.client.CliFrontend
>       -     -yjm
> 13:31:04,951 INFO  org.apache.flink.client.CliFrontend
>       -     1024
> 13:31:04,951 INFO  org.apache.flink.client.CliFrontend
>       -     -ytm
> 13:31:04,951 INFO  org.apache.flink.client.CliFrontend
>       -     1024
> 13:31:04,952 INFO  org.apache.flink.client.CliFrontend
>       -     -c
> 13:31:04,952 INFO  org.apache.flink.client.CliFrontend
>       -     eu.amidst.flinklink.examples.WordCountExample
> 13:31:04,952 INFO  org.apache.flink.client.CliFrontend
>       -     ../flinklink.jar
> 13:31:04,952 INFO  org.apache.flink.client.CliFrontend
>       -
> --------------------------------------------------------------------------------
> 13:31:04,954 INFO  org.apache.flink.client.CliFrontend
>       - Using configuration directory /home/hadoop/flink-0.10.0/conf
> 13:31:04,954 INFO  org.apache.flink.client.CliFrontend
>       - Trying to load configuration file
> 13:31:05,193 INFO  org.apache.flink.client.CliFrontend
>       - Running 'run' command.
> 13:31:05,201 INFO  org.apache.flink.client.CliFrontend
>       - Building program from JAR file
> 13:31:05,326 INFO  org.apache.flink.client.CliFrontend
>       - YARN cluster mode detected. Switching Log4j output to console
> 13:31:05,385 INFO  org.apache.hadoop.yarn.client.RMProxy
>       - Connecting to ResourceManager at ip-172-31-33-221.us
> -west-2.compute.internal/172.31.33.221:8032
> 13:31:05,534 INFO  org.apache.flink.client.FlinkYarnSessionCli
>       - No path for the flink jar passed. Using the location of class
> org.apache.flink.yarn.FlinkYarnClient to locate the jar
> 13:31:05,545 INFO  org.apache.flink.yarn.FlinkYarnClient
>       - Using values:
> 13:31:05,547 INFO  org.apache.flink.yarn.FlinkYarnClient
>       - TaskManager count = 1
> 13:31:05,547 INFO  org.apache.flink.yarn.FlinkYarnClient
>       - JobManager memory = 1024
> 13:31:05,547 INFO  org.apache.flink.yarn.FlinkYarnClient
>       - TaskManager memory = 1024
> 13:31:06,112 INFO  org.apache.flink.yarn.Utils
>       - Copying from
> file:/home/hadoop/flink-0.10.0/lib/flink-dist-0.10.0.jar to
> hdfs://ip-172-31-33-221.us-west-2.compute.internal:8020/user/hadoop/.flink/application_1452250761414_0005/flink-dist-0.10.0.jar
> 13:31:06,843 INFO  org.apache.flink.yarn.Utils
>       - Copying from /home/hadoop/flink-0.10.0/conf/flink-conf.yaml to
> hdfs://ip-172-31-33-221.us-west-2.compute.internal:8020/user/hadoop/.flink/application_1452250761414_0005/flink-conf.yaml
> 13:31:06,857 INFO  org.apache.flink.yarn.Utils
>       - Copying from file:/home/hadoop/flink-0.10.0/conf/logback.xml to
> hdfs://ip-172-31-33-221.us-west-2.compute.internal:8020/user/hadoop/.flink/application_1452250761414_0005/logback.xml
> 13:31:06,869 INFO  org.apache.flink.yarn.Utils
>       - Copying from file:/home/hadoop/flink-0.10.0/conf/log4j.properties
> to
> hdfs://ip-172-31-33-221.us-west-2.compute.internal:8020/user/hadoop/.flink/application_1452250761414_0005/log4j.properties
> 13:31:06,892 INFO  org.apache.flink.yarn.FlinkYarnClient
>       - Submitting application master application_1452250761414_0005
> 13:31:06,917 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl
>       - Submitted application application_1452250761414_0005
> 13:31:06,917 INFO  org.apache.flink.yarn.FlinkYarnClient
>       - Waiting for the cluster to be allocated
> 13:31:06,919 INFO  org.apache.flink.yarn.FlinkYarnClient
>       - Deploying cluster, current state ACCEPTED
> 13:31:07,920 INFO  org.apache.flink.yarn.FlinkYarnClient
>       - Deploying cluster, current state ACCEPTED
> 13:31:08,922 INFO  org.apache.flink.yarn.FlinkYarnClient
>       - Deploying cluster, current state ACCEPTED
> 13:31:09,924 INFO  org.apache.flink.yarn.FlinkYarnClient
>       - Deploying cluster, current state ACCEPTED
> 13:31:10,925 INFO  org.apache.flink.yarn.FlinkYarnClient
>       - YARN application has been deployed successfully.
> 13:31:10,929 INFO  org.apache.flink.yarn.FlinkYarnCluster
>       - Start actor system.
> 13:31:11,412 INFO  akka.event.slf4j.Slf4jLogger
>       - Slf4jLogger started
> 13:31:11,472 INFO  Remoting
>       - Starting remoting
> 13:31:11,698 INFO  Remoting
>       - Remoting started; listening on addresses :[
> akka.tcp://flink@172.31.33.221:39464]
> 13:31:11,733 INFO  org.apache.flink.yarn.FlinkYarnCluster
>       - Start application client.
> 13:31:11,737 INFO  org.apache.flink.client.CliFrontend
>       - YARN cluster started
> 13:31:11,737 INFO  org.apache.flink.client.CliFrontend
>       - JobManager web interface address
> http://ip-172-31-33-221.us-west-2.compute.internal:20888/proxy/application_1452250761414_0005/
> 13:31:11,737 INFO  org.apache.flink.client.CliFrontend
>       - Waiting until all TaskManagers have connected
> 13:31:11,748 INFO  org.apache.flink.yarn.ApplicationClient
>       - Notification about new leader address
> akka.tcp://flink@172.31.45.98:46965/user/jobmanager with session ID null.
> 13:31:11,752 INFO  org.apache.flink.client.CliFrontend
>       - No status updates from the YARN cluster received so far. Waiting ...
> 13:31:11,752 INFO  org.apache.flink.yarn.ApplicationClient
>       - Received address of new leader
> akka.tcp://flink@172.31.45.98:46965/user/jobmanager with session ID null.
> 13:31:11,753 INFO  org.apache.flink.yarn.ApplicationClient
>       - Disconnect from JobManager null.
> 13:31:11,757 INFO  org.apache.flink.yarn.ApplicationClient
>       - Trying to register at JobManager
> akka.tcp://flink@172.31.45.98:46965/user/jobmanager.
> 13:31:12,040 INFO  org.apache.flink.yarn.ApplicationClient
>       - Successfully registered at the JobManager Actor[
> akka.tcp://flink@172.31.45.98:46965/user/jobmanager#1549383782]
> 13:31:12,253 INFO  org.apache.flink.client.CliFrontend
>       - TaskManager status (0/1)
> 13:31:12,753 INFO  org.apache.flink.client.CliFrontend
>       - TaskManager status (0/1)
> 13:31:13,254 INFO  org.apache.flink.client.CliFrontend
>       - TaskManager status (0/1)
> 13:31:13,755 INFO  org.apache.flink.client.CliFrontend
>       - TaskManager status (0/1)
> 13:31:14,255 INFO  org.apache.flink.client.CliFrontend
>       - TaskManager status (0/1)
> 13:31:14,756 INFO  org.apache.flink.client.CliFrontend
>       - TaskManager status (0/1)
> 13:31:15,257 INFO  org.apache.flink.client.CliFrontend
>       - TaskManager status (0/1)
> 13:31:15,758 INFO  org.apache.flink.client.CliFrontend
>       - TaskManager status (0/1)
> 13:31:16,258 INFO  org.apache.flink.client.CliFrontend
>       - All TaskManagers are connected
> 13:31:16,264 INFO  org.apache.flink.client.program.Client
>       - Starting client actor system
> 13:31:16,265 INFO  org.apache.flink.runtime.client.JobClient
>       - Starting JobClient actor system
> 13:31:16,283 INFO  akka.event.slf4j.Slf4jLogger
>       - Slf4jLogger started
> 13:31:16,288 INFO  Remoting
>       - Starting remoting
> 13:31:16,301 INFO  Remoting
>       - Remoting started; listening on addresses :[
> akka.tcp://flink@127.0.0.1:45919]
> 13:31:16,302 INFO  org.apache.flink.runtime.client.JobClient
>       - Started JobClient actor system at 127.0.0.1:45919
> 13:31:16,302 INFO  org.apache.flink.client.CliFrontend
>       - Using the parallelism provided by the remote cluster (4). To use
> another parallelism, set it at the ./bin/flink client.
> 13:31:16,302 INFO  org.apache.flink.client.CliFrontend
>       - Starting execution of program
> 13:31:16,303 INFO  org.apache.flink.client.program.Client
>       - Starting program in interactive mode
> 13:31:16,313 INFO  eu.amidst.flinklink.examples.WordCountExample
>       - Entering application.
> 13:31:16,342 INFO  TestClass.class
>       - Logger in TestClass
> 13:31:16,346 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>       - class eu.amidst.flinklink.examples.WordCountExample$TestClass is
> not a valid POJO type
> 13:31:16,376 INFO  org.apache.flink.client.CliFrontend
>       - Program execution finished
> 13:31:16,384 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator
>       - Shutting down remote daemon.
> 13:31:16,386 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator
>       - Remote daemon shut down; proceeding with flushing remote transports.
> 13:31:16,408 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator
>       - Remoting shut down.
> 13:31:16,431 INFO  org.apache.flink.client.CliFrontend
>       - Shutting down YARN cluster
> 13:31:16,431 INFO  org.apache.flink.yarn.FlinkYarnCluster
>       - Sending shutdown request to the Application Master
> 13:31:16,432 INFO  org.apache.flink.yarn.ApplicationClient
>       - Sending StopYarnSession request to ApplicationMaster.
> 13:31:16,568 INFO  org.apache.flink.yarn.ApplicationClient
>       - Remote JobManager has been stopped successfully. Stopping local
> application client
> 13:31:16,570 INFO  org.apache.flink.yarn.ApplicationClient
>       - Stopped Application client.
> 13:31:16,570 INFO  org.apache.flink.yarn.ApplicationClient
>       - Disconnect from JobManager Actor[
> akka.tcp://flink@172.31.45.98:46965/user/jobmanager#1549383782].
> 13:31:16,573 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator
>       - Shutting down remote daemon.
> 13:31:16,573 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator
>       - Remote daemon shut down; proceeding with flushing remote transports.
> 13:31:16,584 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator
>       - Remoting shut down.
> 13:31:16,595 INFO  org.apache.flink.yarn.FlinkYarnCluster
>       - Deleting files in
> hdfs://ip-172-31-33-221.us-west-2.compute.internal:8020/user/hadoop/.flink/application_1452250761414_0005
> 13:31:16,596 INFO  org.apache.flink.yarn.FlinkYarnCluster
>       - Application application_1452250761414_0005 finished with state
> FINISHED and final state SUCCEEDED at 1452259876445
> 13:31:16,747 INFO  org.apache.flink.yarn.FlinkYarnCluster
>       - YARN Client is shutting down
>
>
> You can see the log messages from the WordCountExample and TestClass
> classes. But I have problems to show the logger message (INFO) in the
> LineSplitter class. Presumably, because it is executed in the CORE nodes
> and node in the MASTER node (it all runs well in my local computer).
>
> Any tips?
> Ana
>
>
> On 06 Jan 2016, at 15:58, Ana M. Martinez <an...@cs.aau.dk> wrote:
>
> Hi Till,
>
> I am afraid it does not work in any case.
>
> I am following the steps you indicate on your websites (for yarn
> configuration and loggers with slf4j):
>
> 1) Enable log aggregation in yarn-site:
>
> https://ci.apache.org/projects/flink/flink-docs-release-0.7/yarn_setup.html#log-files
>
> 2) Include Loggers as indicated here (see WordCountExample below):
>
> https://ci.apache.org/projects/flink/flink-docs-release-0.7/internal_logging.html
>
> But I cannot get the log messages that run in the map functions. Am I
> missing something?
>
> Thanks,
> Ana
>
> On 04 Jan 2016, at 14:00, Till Rohrmann <tr...@apache.org> wrote:
>
> I think the YARN application has to be finished in order for the logs to
> be accessible.
>
> Judging from you commands, you’re starting a long running YARN application
> running Flink with ./bin/yarn-session.sh -n 1 -tm 2048 -s 4. This cluster
> won’t be used though, because you’re executing your job with ./bin/flink
> run -m yarn-cluster which will start another YARN application which is
> only alive as long as the Flink job is executed. If you want to run your
> job on the long running YARN application, then you simply have to omit -m
> yarn-cluster.
>
> Cheers,
> Till
> ​
>
> On Mon, Jan 4, 2016 at 12:36 PM, Ana M. Martinez <an...@cs.aau.dk> wrote:
>
>> Hi Till,
>>
>> Sorry for the delay (Xmas break). I have activated log aggregation
>> on flink-conf.yaml with yarn.log-aggregation-enable: true (as I can’t find
>> a yarn-site.xml).
>> But the command yarn logs -applicationId application_1451903796996_0008
>> gives me the following output:
>>
>> INFO client.RMProxy: Connecting to ResourceManager at xxx
>> /var/log/hadoop-yarn/apps/hadoop/logs/application_1451903796996_0008does
>> not exist.
>> Log aggregation has not completed or is not enabled
>>
>>
>> I’ve tried to restart the Flink JobManager and TaskManagers as follows:
>> ./bin/yarn-session.sh -n 1 -tm 2048 -s 4
>> and then with a detached screen, run my application with ./bin/flink run
>> -m yarn-cluster ...
>>
>> I am not sure if my problem is that I am not setting the
>> log-aggregation-enable property well or I am not restarting the Flink
>> JobManager and TaskManagers as I should… Any idea?
>>
>> Thanks,
>> Ana
>>
>> On 18 Dec 2015, at 16:29, Till Rohrmann <tr...@apache.org> wrote:
>>
>> In which log file are you exactly looking for the logging statements? And
>> on what machine? You have to look on the machines on which the yarn
>> container were started. Alternatively if you have log aggregation
>> activated, then you can simply retrieve the log files via yarn logs.
>>
>> Cheers,
>> Till
>>
>> On Fri, Dec 18, 2015 at 3:49 PM, Ana M. Martinez <an...@cs.aau.dk> wrote:
>>
>>> Hi Till,
>>>
>>> Many thanks for your quick response.
>>>
>>> I have modified the WordCountExample to re-reproduce my problem in a
>>> simple example.
>>>
>>> I run the code below with the following command:
>>> ./bin/flink run -m yarn-cluster -yn 1 -ys 4 -yjm 1024 -ytm 1024 -c
>>> mypackage.WordCountExample ../flinklink.jar
>>>
>>> And if I check the log file I see all logger messages except the one in
>>> the flatMap function of the inner LineSplitter class, which is actually the
>>> one I am most interested in.
>>>
>>> Is that an expected behaviour?
>>>
>>> Thanks,
>>> Ana
>>>
>>> import org.apache.flink.api.common.functions.FlatMapFunction;
>>> import org.apache.flink.api.java.DataSet;
>>> import org.apache.flink.api.java.ExecutionEnvironment;
>>> import org.apache.flink.api.java.tuple.Tuple2;
>>> import org.apache.flink.util.Collector;
>>> import org.slf4j.Logger;
>>> import org.slf4j.LoggerFactory;
>>>
>>> import java.io.Serializable;
>>> import java.util.ArrayList;
>>> import java.util.List;
>>>
>>> public class WordCountExample {
>>>     static Logger logger = LoggerFactory.getLogger(WordCountExample.class);
>>>
>>>     public static void main(String[] args) throws Exception {
>>>         final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
>>>
>>>         logger.info("Entering application.");
>>>
>>>     DataSet<String> text = env.fromElements(
>>>                 "Who's there?",
>>>                 "I think I hear them. Stand, ho! Who's there?");
>>>
>>>         List<Integer> elements = new ArrayList<Integer>();
>>>         elements.add(0);
>>>
>>>
>>>         DataSet<TestClass> set = env.fromElements(new TestClass(elements));
>>>
>>>         DataSet<Tuple2<String, Integer>> wordCounts = text
>>>                 .flatMap(new LineSplitter())
>>>                 .withBroadcastSet(set, "set")
>>>                 .groupBy(0)
>>>                 .sum(1);
>>>
>>>         wordCounts.print();
>>>
>>>
>>>     }
>>>
>>>     public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
>>>
>>>         static Logger loggerLineSplitter = LoggerFactory.getLogger(LineSplitter.class);
>>>
>>>         @Override
>>>         public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
>>>             loggerLineSplitter.info("Logger in LineSplitter.flatMap");
>>>             for (String word : line.split(" ")) {
>>>                 out.collect(new Tuple2<String, Integer>(word, 1));
>>>             }
>>>         }
>>>     }
>>>
>>>     public static class TestClass implements Serializable {
>>>         private static final long serialVersionUID = -2932037991574118651L;
>>>
>>>         static Logger loggerTestClass = LoggerFactory.getLogger("WordCountExample.TestClass");
>>>
>>>         List<Integer> integerList;
>>>         public TestClass(List<Integer> integerList){
>>>             this.integerList=integerList;
>>>             loggerTestClass.info("Logger in TestClass");
>>>         }
>>>
>>>
>>>     }
>>> }
>>>
>>>
>>>
>>>
>>> On 17 Dec 2015, at 16:08, Till Rohrmann <tr...@apache.org> wrote:
>>>
>>> Hi Ana,
>>>
>>> you can simply modify the `log4j.properties` file in the `conf`
>>> directory. It should be automatically included in the Yarn application.
>>>
>>> Concerning your logging problem, it might be that you have set the
>>> logging level too high. Could you share the code with us?
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, Dec 17, 2015 at 1:56 PM, Ana M. Martinez <an...@cs.aau.dk> wrote:
>>>
>>>> Hi flink community,
>>>>
>>>> I am trying to show log messages using log4j.
>>>> It works fine overall except for the messages I want to show in an
>>>> inner class that implements
>>>> org.apache.flink.api.common.aggregators.ConvergenceCriterion.
>>>> I am very new to this, but it seems that I’m having problems to show
>>>> the messages included in the isConverged function, as it runs in the task
>>>> managers?
>>>> E.g. the log messages in the outer class (before map-reduce operations)
>>>> are properly shown.
>>>>
>>>> I am also interested in providing my own log4j.properties file. I am
>>>> using the ./bin/flink run -m yarn-cluster on Amazon clusters.
>>>>
>>>> Thanks,
>>>> Ana
>>>
>>>
>>>
>>>
>>
>>
>
>
>

Re: Problem to show logs in task managers

Posted by "Ana M. Martinez" <an...@cs.aau.dk>.
Thanks for the tip Robert! It was a good idea to rule out other possible causes, but I am afraid that is not the problem. If we stick to the WordCountExample (for simplicity), the Exception is thrown if placed into the flatMap function.

I am going to try to re-write my problem and all the settings below:

When I try to aggregate all logs:
 $yarn logs -applicationId application_1452250761414_0005

the following message is retrieved:
16/01/08 13:32:37 INFO client.RMProxy: Connecting to ResourceManager at ip-172-31-33-221.us<http://ip-172-31-33-221.us>-west-2.compute.internal/172.31.33.221:8032
/var/log/hadoop-yarn/apps/hadoop/logs/application_1452250761414_0005does not exist.
Log aggregation has not completed or is not enabled.

(Tried the same command a few minutes later and got the same message, so might it be that log aggregation is not properly enabled??)

I am going to carefully enumerate all the steps I have followed (and settings) to see if someone can identify why the Logger messages from CORE nodes (in an Amazon cluster) are not shown.

1) Enable yarn.log-aggregation-enable property to true in /etc/alternatives/hadoop-conf/yarn-site.xml.

2) Include log messages in my WordCountExample as follows:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;


public class WordCountExample {
    static Logger logger = LoggerFactory.getLogger(WordCountExample.class);

    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        logger.info("Entering application.");

        DataSet<String> text = env.fromElements(
                "Who's there?",
                "I think I hear them. Stand, ho! Who's there?");

        List<Integer> elements = new ArrayList<Integer>();
        elements.add(0);


        DataSet<TestClass> set = env.fromElements(new TestClass(elements));

        DataSet<Tuple2<String, Integer>> wordCounts = text
                .flatMap(new LineSplitter())
                .withBroadcastSet(set, "set")
                .groupBy(0)
                .sum(1);

        wordCounts.writeAsText(“output.txt", FileSystem.WriteMode.OVERWRITE);


    }

    public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {

        static Logger loggerLineSplitter = LoggerFactory.getLogger(LineSplitter.class);

        @Override
        public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
            loggerLineSplitter.info("Logger in LineSplitter.flatMap");

                for (String word : line.split(" ")) {
                        out.collect(new Tuple2<String, Integer>(word, 1));
                        //throw new RuntimeException("LineSplitter class called");
                }

        }
    }

    public static class TestClass implements Serializable {
        private static final long serialVersionUID = -2932037991574118651L;

        static Logger loggerTestClass = LoggerFactory.getLogger("TestClass.class");

        List<Integer> integerList;
        public TestClass(List<Integer> integerList){
            this.integerList=integerList;
            loggerTestClass.info("Logger in TestClass");
        }


    }
}

3) Start a yarn-cluster and execute my program with the following command:

$./bin/flink run -m yarn-cluster -yn 1 -ys 4 -yjm 1024 -ytm 1024 -c eu.amidst.flinklink.examples.WordCountExample ../flinklink.jar


4) The output in the log folder is as follows:

13:31:04,945 INFO  org.apache.flink.client.CliFrontend                           - --------------------------------------------------------------------------------
13:31:04,947 INFO  org.apache.flink.client.CliFrontend                           -  Starting Command Line Client (Version: 0.10.0, Rev:ab2cca4, Date:10.11.2015 @ 13:50:14 UTC)
13:31:04,947 INFO  org.apache.flink.client.CliFrontend                           -  Current user: hadoop
13:31:04,947 INFO  org.apache.flink.client.CliFrontend                           -  JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.65-b01
13:31:04,947 INFO  org.apache.flink.client.CliFrontend                           -  Maximum heap size: 3344 MiBytes
13:31:04,947 INFO  org.apache.flink.client.CliFrontend                           -  JAVA_HOME: /etc/alternatives/jre
13:31:04,950 INFO  org.apache.flink.client.CliFrontend                           -  Hadoop version: 2.6.0
13:31:04,950 INFO  org.apache.flink.client.CliFrontend                           -  JVM Options:
13:31:04,950 INFO  org.apache.flink.client.CliFrontend                           -     -Dlog.file=/home/hadoop/flink-0.10.0/log/flink-hadoop-client-ip-172-31-33-221.log
13:31:04,950 INFO  org.apache.flink.client.CliFrontend                           -     -Dlog4j.configuration=file:/home/hadoop/flink-0.10.0/conf/log4j-cli.properties
13:31:04,950 INFO  org.apache.flink.client.CliFrontend                           -     -Dlogback.configurationFile=file:/home/hadoop/flink-0.10.0/conf/logback.xml
13:31:04,951 INFO  org.apache.flink.client.CliFrontend                           -  Program Arguments:
13:31:04,951 INFO  org.apache.flink.client.CliFrontend                           -     run
13:31:04,951 INFO  org.apache.flink.client.CliFrontend                           -     -m
13:31:04,951 INFO  org.apache.flink.client.CliFrontend                           -     yarn-cluster
13:31:04,951 INFO  org.apache.flink.client.CliFrontend                           -     -yn
13:31:04,951 INFO  org.apache.flink.client.CliFrontend                           -     1
13:31:04,951 INFO  org.apache.flink.client.CliFrontend                           -     -ys
13:31:04,951 INFO  org.apache.flink.client.CliFrontend                           -     4
13:31:04,951 INFO  org.apache.flink.client.CliFrontend                           -     -yjm
13:31:04,951 INFO  org.apache.flink.client.CliFrontend                           -     1024
13:31:04,951 INFO  org.apache.flink.client.CliFrontend                           -     -ytm
13:31:04,951 INFO  org.apache.flink.client.CliFrontend                           -     1024
13:31:04,952 INFO  org.apache.flink.client.CliFrontend                           -     -c
13:31:04,952 INFO  org.apache.flink.client.CliFrontend                           -     eu.amidst.flinklink.examples.WordCountExample
13:31:04,952 INFO  org.apache.flink.client.CliFrontend                           -     ../flinklink.jar
13:31:04,952 INFO  org.apache.flink.client.CliFrontend                           - --------------------------------------------------------------------------------
13:31:04,954 INFO  org.apache.flink.client.CliFrontend                           - Using configuration directory /home/hadoop/flink-0.10.0/conf
13:31:04,954 INFO  org.apache.flink.client.CliFrontend                           - Trying to load configuration file
13:31:05,193 INFO  org.apache.flink.client.CliFrontend                           - Running 'run' command.
13:31:05,201 INFO  org.apache.flink.client.CliFrontend                           - Building program from JAR file
13:31:05,326 INFO  org.apache.flink.client.CliFrontend                           - YARN cluster mode detected. Switching Log4j output to console
13:31:05,385 INFO  org.apache.hadoop.yarn.client.RMProxy                         - Connecting to ResourceManager at ip-172-31-33-221.us<http://ip-172-31-33-221.us>-west-2.compute.internal/172.31.33.221:8032
13:31:05,534 INFO  org.apache.flink.client.FlinkYarnSessionCli                   - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.FlinkYarnClient to locate the jar
13:31:05,545 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Using values:
13:31:05,547 INFO  org.apache.flink.yarn.FlinkYarnClient                         - TaskManager count = 1
13:31:05,547 INFO  org.apache.flink.yarn.FlinkYarnClient                         - JobManager memory = 1024
13:31:05,547 INFO  org.apache.flink.yarn.FlinkYarnClient                         - TaskManager memory = 1024
13:31:06,112 INFO  org.apache.flink.yarn.Utils                                   - Copying from file:/home/hadoop/flink-0.10.0/lib/flink-dist-0.10.0.jar to hdfs://ip-172-31-33-221.us-west-2.compute.internal:8020/user/hadoop/.flink/application_1452250761414_0005/flink-dist-0.10.0.jar
13:31:06,843 INFO  org.apache.flink.yarn.Utils                                   - Copying from /home/hadoop/flink-0.10.0/conf/flink-conf.yaml to hdfs://ip-172-31-33-221.us-west-2.compute.internal:8020/user/hadoop/.flink/application_1452250761414_0005/flink-conf.yaml
13:31:06,857 INFO  org.apache.flink.yarn.Utils                                   - Copying from file:/home/hadoop/flink-0.10.0/conf/logback.xml to hdfs://ip-172-31-33-221.us-west-2.compute.internal:8020/user/hadoop/.flink/application_1452250761414_0005/logback.xml
13:31:06,869 INFO  org.apache.flink.yarn.Utils                                   - Copying from file:/home/hadoop/flink-0.10.0/conf/log4j.properties to hdfs://ip-172-31-33-221.us-west-2.compute.internal:8020/user/hadoop/.flink/application_1452250761414_0005/log4j.properties
13:31:06,892 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Submitting application master application_1452250761414_0005
13:31:06,917 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl         - Submitted application application_1452250761414_0005
13:31:06,917 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Waiting for the cluster to be allocated
13:31:06,919 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
13:31:07,920 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
13:31:08,922 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
13:31:09,924 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
13:31:10,925 INFO  org.apache.flink.yarn.FlinkYarnClient                         - YARN application has been deployed successfully.
13:31:10,929 INFO  org.apache.flink.yarn.FlinkYarnCluster                        - Start actor system.
13:31:11,412 INFO  akka.event.slf4j.Slf4jLogger                                  - Slf4jLogger started
13:31:11,472 INFO  Remoting                                                      - Starting remoting
13:31:11,698 INFO  Remoting                                                      - Remoting started; listening on addresses :[akka.tcp://flink@172.31.33.221:39464]
13:31:11,733 INFO  org.apache.flink.yarn.FlinkYarnCluster                        - Start application client.
13:31:11,737 INFO  org.apache.flink.client.CliFrontend                           - YARN cluster started
13:31:11,737 INFO  org.apache.flink.client.CliFrontend                           - JobManager web interface address http://ip-172-31-33-221.us-west-2.compute.internal:20888/proxy/application_1452250761414_0005/
13:31:11,737 INFO  org.apache.flink.client.CliFrontend                           - Waiting until all TaskManagers have connected
13:31:11,748 INFO  org.apache.flink.yarn.ApplicationClient                       - Notification about new leader address akka.tcp://flink@172.31.45.98:46965/user/jobmanager with session ID null.
13:31:11,752 INFO  org.apache.flink.client.CliFrontend                           - No status updates from the YARN cluster received so far. Waiting ...
13:31:11,752 INFO  org.apache.flink.yarn.ApplicationClient                       - Received address of new leader akka.tcp://flink@172.31.45.98:46965/user/jobmanager with session ID null.
13:31:11,753 INFO  org.apache.flink.yarn.ApplicationClient                       - Disconnect from JobManager null.
13:31:11,757 INFO  org.apache.flink.yarn.ApplicationClient                       - Trying to register at JobManager akka.tcp://flink@172.31.45.98:46965/user/jobmanager.
13:31:12,040 INFO  org.apache.flink.yarn.ApplicationClient                       - Successfully registered at the JobManager Actor[akka.tcp://flink@172.31.45.98:46965/user/jobmanager#1549383782]
13:31:12,253 INFO  org.apache.flink.client.CliFrontend                           - TaskManager status (0/1)
13:31:12,753 INFO  org.apache.flink.client.CliFrontend                           - TaskManager status (0/1)
13:31:13,254 INFO  org.apache.flink.client.CliFrontend                           - TaskManager status (0/1)
13:31:13,755 INFO  org.apache.flink.client.CliFrontend                           - TaskManager status (0/1)
13:31:14,255 INFO  org.apache.flink.client.CliFrontend                           - TaskManager status (0/1)
13:31:14,756 INFO  org.apache.flink.client.CliFrontend                           - TaskManager status (0/1)
13:31:15,257 INFO  org.apache.flink.client.CliFrontend                           - TaskManager status (0/1)
13:31:15,758 INFO  org.apache.flink.client.CliFrontend                           - TaskManager status (0/1)
13:31:16,258 INFO  org.apache.flink.client.CliFrontend                           - All TaskManagers are connected
13:31:16,264 INFO  org.apache.flink.client.program.Client                        - Starting client actor system
13:31:16,265 INFO  org.apache.flink.runtime.client.JobClient                     - Starting JobClient actor system
13:31:16,283 INFO  akka.event.slf4j.Slf4jLogger                                  - Slf4jLogger started
13:31:16,288 INFO  Remoting                                                      - Starting remoting
13:31:16,301 INFO  Remoting                                                      - Remoting started; listening on addresses :[akka.tcp://flink@127.0.0.1:45919]
13:31:16,302 INFO  org.apache.flink.runtime.client.JobClient                     - Started JobClient actor system at 127.0.0.1:45919
13:31:16,302 INFO  org.apache.flink.client.CliFrontend                           - Using the parallelism provided by the remote cluster (4). To use another parallelism, set it at the ./bin/flink client.
13:31:16,302 INFO  org.apache.flink.client.CliFrontend                           - Starting execution of program
13:31:16,303 INFO  org.apache.flink.client.program.Client                        - Starting program in interactive mode
13:31:16,313 INFO  eu.amidst.flinklink.examples.WordCountExample                 - Entering application.
13:31:16,342 INFO  TestClass.class                                               - Logger in TestClass
13:31:16,346 INFO  org.apache.flink.api.java.typeutils.TypeExtractor             - class eu.amidst.flinklink.examples.WordCountExample$TestClass is not a valid POJO type
13:31:16,376 INFO  org.apache.flink.client.CliFrontend                           - Program execution finished
13:31:16,384 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Shutting down remote daemon.
13:31:16,386 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remote daemon shut down; proceeding with flushing remote transports.
13:31:16,408 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remoting shut down.
13:31:16,431 INFO  org.apache.flink.client.CliFrontend                           - Shutting down YARN cluster
13:31:16,431 INFO  org.apache.flink.yarn.FlinkYarnCluster                        - Sending shutdown request to the Application Master
13:31:16,432 INFO  org.apache.flink.yarn.ApplicationClient                       - Sending StopYarnSession request to ApplicationMaster.
13:31:16,568 INFO  org.apache.flink.yarn.ApplicationClient                       - Remote JobManager has been stopped successfully. Stopping local application client
13:31:16,570 INFO  org.apache.flink.yarn.ApplicationClient                       - Stopped Application client.
13:31:16,570 INFO  org.apache.flink.yarn.ApplicationClient                       - Disconnect from JobManager Actor[akka.tcp://flink@172.31.45.98:46965/user/jobmanager#1549383782].
13:31:16,573 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Shutting down remote daemon.
13:31:16,573 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remote daemon shut down; proceeding with flushing remote transports.
13:31:16,584 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remoting shut down.
13:31:16,595 INFO  org.apache.flink.yarn.FlinkYarnCluster                        - Deleting files in hdfs://ip-172-31-33-221.us-west-2.compute.internal:8020/user/hadoop/.flink/application_1452250761414_0005
13:31:16,596 INFO  org.apache.flink.yarn.FlinkYarnCluster                        - Application application_1452250761414_0005 finished with state FINISHED and final state SUCCEEDED at 1452259876445
13:31:16,747 INFO  org.apache.flink.yarn.FlinkYarnCluster                        - YARN Client is shutting down


You can see the log messages from the WordCountExample and TestClass classes. But I have problems to show the logger message (INFO) in the LineSplitter class. Presumably, because it is executed in the CORE nodes and node in the MASTER node (it all runs well in my local computer).

Any tips?
Ana


On 06 Jan 2016, at 15:58, Ana M. Martinez <an...@cs.aau.dk>> wrote:

Hi Till,

I am afraid it does not work in any case.

I am following the steps you indicate on your websites (for yarn configuration and loggers with slf4j):

1) Enable log aggregation in yarn-site:
https://ci.apache.org/projects/flink/flink-docs-release-0.7/yarn_setup.html#log-files

2) Include Loggers as indicated here (see WordCountExample below):
https://ci.apache.org/projects/flink/flink-docs-release-0.7/internal_logging.html

But I cannot get the log messages that run in the map functions. Am I missing something?

Thanks,
Ana

On 04 Jan 2016, at 14:00, Till Rohrmann <tr...@apache.org>> wrote:


I think the YARN application has to be finished in order for the logs to be accessible.

Judging from you commands, you’re starting a long running YARN application running Flink with ./bin/yarn-session.sh -n 1 -tm 2048 -s 4. This cluster won’t be used though, because you’re executing your job with ./bin/flink run -m yarn-cluster which will start another YARN application which is only alive as long as the Flink job is executed. If you want to run your job on the long running YARN application, then you simply have to omit -m yarn-cluster.

Cheers,
Till

​

On Mon, Jan 4, 2016 at 12:36 PM, Ana M. Martinez <an...@cs.aau.dk>> wrote:
Hi Till,

Sorry for the delay (Xmas break). I have activated log aggregation on flink-conf.yaml with yarn.log-aggregation-enable: true (as I can’t find a yarn-site.xml).
But the command yarn logs -applicationId application_1451903796996_0008 gives me the following output:

INFO client.RMProxy: Connecting to ResourceManager at xxx
/var/log/hadoop-yarn/apps/hadoop/logs/application_1451903796996_0008does not exist.
Log aggregation has not completed or is not enabled

I’ve tried to restart the Flink JobManager and TaskManagers as follows:
./bin/yarn-session.sh -n 1 -tm 2048 -s 4
and then with a detached screen, run my application with ./bin/flink run -m yarn-cluster ...

I am not sure if my problem is that I am not setting the log-aggregation-enable property well or I am not restarting the Flink JobManager and TaskManagers as I should… Any idea?

Thanks,
Ana

On 18 Dec 2015, at 16:29, Till Rohrmann <tr...@apache.org>> wrote:

In which log file are you exactly looking for the logging statements? And on what machine? You have to look on the machines on which the yarn container were started. Alternatively if you have log aggregation activated, then you can simply retrieve the log files via yarn logs.

Cheers,
Till

On Fri, Dec 18, 2015 at 3:49 PM, Ana M. Martinez <an...@cs.aau.dk>> wrote:
Hi Till,

Many thanks for your quick response.

I have modified the WordCountExample to re-reproduce my problem in a simple example.

I run the code below with the following command:
./bin/flink run -m yarn-cluster -yn 1 -ys 4 -yjm 1024 -ytm 1024 -c mypackage.WordCountExample ../flinklink.jar

And if I check the log file I see all logger messages except the one in the flatMap function of the inner LineSplitter class, which is actually the one I am most interested in.

Is that an expected behaviour?

Thanks,
Ana


import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

public class WordCountExample {
    static Logger logger = LoggerFactory.getLogger(WordCountExample.class);

    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        logger.info("Entering application.");

    DataSet<String> text = env.fromElements(
                "Who's there?",
                "I think I hear them. Stand, ho! Who's there?");

        List<Integer> elements = new ArrayList<Integer>();
        elements.add(0);


        DataSet<TestClass> set = env.fromElements(new TestClass(elements));

        DataSet<Tuple2<String, Integer>> wordCounts = text
                .flatMap(new LineSplitter())
                .withBroadcastSet(set, "set")
                .groupBy(0)
                .sum(1);

        wordCounts.print();


    }

    public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {

        static Logger loggerLineSplitter = LoggerFactory.getLogger(LineSplitter.class);

        @Override
        public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
            loggerLineSplitter.info("Logger in LineSplitter.flatMap");
            for (String word : line.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }

    public static class TestClass implements Serializable {
        private static final long serialVersionUID = -2932037991574118651L;

        static Logger loggerTestClass = LoggerFactory.getLogger("WordCountExample.TestClass");

        List<Integer> integerList;
        public TestClass(List<Integer> integerList){
            this.integerList=integerList;
            loggerTestClass.info("Logger in TestClass");
        }


    }
}



On 17 Dec 2015, at 16:08, Till Rohrmann <tr...@apache.org>> wrote:

Hi Ana,

you can simply modify the `log4j.properties` file in the `conf` directory. It should be automatically included in the Yarn application.

Concerning your logging problem, it might be that you have set the logging level too high. Could you share the code with us?

Cheers,
Till

On Thu, Dec 17, 2015 at 1:56 PM, Ana M. Martinez <an...@cs.aau.dk>> wrote:
Hi flink community,

I am trying to show log messages using log4j.
It works fine overall except for the messages I want to show in an inner class that implements org.apache.flink.api.common.aggregators.ConvergenceCriterion.
I am very new to this, but it seems that I’m having problems to show the messages included in the isConverged function, as it runs in the task managers?
E.g. the log messages in the outer class (before map-reduce operations) are properly shown.

I am also interested in providing my own log4j.properties file. I am using the ./bin/flink run -m yarn-cluster on Amazon clusters.

Thanks,
Ana








Re: Problem to show logs in task managers

Posted by "Ana M. Martinez" <an...@cs.aau.dk>.
Hi Till,

I am afraid it does not work in any case.

I am following the steps you indicate on your websites (for yarn configuration and loggers with slf4j):

1) Enable log aggregation in yarn-site:
https://ci.apache.org/projects/flink/flink-docs-release-0.7/yarn_setup.html#log-files

2) Include Loggers as indicated here (see WordCountExample below):
https://ci.apache.org/projects/flink/flink-docs-release-0.7/internal_logging.html

But I cannot get the log messages that run in the map functions. Am I missing something?

Thanks,
Ana

On 04 Jan 2016, at 14:00, Till Rohrmann <tr...@apache.org>> wrote:


I think the YARN application has to be finished in order for the logs to be accessible.

Judging from you commands, you’re starting a long running YARN application running Flink with ./bin/yarn-session.sh -n 1 -tm 2048 -s 4. This cluster won’t be used though, because you’re executing your job with ./bin/flink run -m yarn-cluster which will start another YARN application which is only alive as long as the Flink job is executed. If you want to run your job on the long running YARN application, then you simply have to omit -m yarn-cluster.

Cheers,
Till

​

On Mon, Jan 4, 2016 at 12:36 PM, Ana M. Martinez <an...@cs.aau.dk>> wrote:
Hi Till,

Sorry for the delay (Xmas break). I have activated log aggregation on flink-conf.yaml with yarn.log-aggregation-enable: true (as I can’t find a yarn-site.xml).
But the command yarn logs -applicationId application_1451903796996_0008 gives me the following output:

INFO client.RMProxy: Connecting to ResourceManager at xxx
/var/log/hadoop-yarn/apps/hadoop/logs/application_1451903796996_0008does not exist.
Log aggregation has not completed or is not enabled

I’ve tried to restart the Flink JobManager and TaskManagers as follows:
./bin/yarn-session.sh -n 1 -tm 2048 -s 4
and then with a detached screen, run my application with ./bin/flink run -m yarn-cluster ...

I am not sure if my problem is that I am not setting the log-aggregation-enable property well or I am not restarting the Flink JobManager and TaskManagers as I should… Any idea?

Thanks,
Ana

On 18 Dec 2015, at 16:29, Till Rohrmann <tr...@apache.org>> wrote:

In which log file are you exactly looking for the logging statements? And on what machine? You have to look on the machines on which the yarn container were started. Alternatively if you have log aggregation activated, then you can simply retrieve the log files via yarn logs.

Cheers,
Till

On Fri, Dec 18, 2015 at 3:49 PM, Ana M. Martinez <an...@cs.aau.dk>> wrote:
Hi Till,

Many thanks for your quick response.

I have modified the WordCountExample to re-reproduce my problem in a simple example.

I run the code below with the following command:
./bin/flink run -m yarn-cluster -yn 1 -ys 4 -yjm 1024 -ytm 1024 -c mypackage.WordCountExample ../flinklink.jar

And if I check the log file I see all logger messages except the one in the flatMap function of the inner LineSplitter class, which is actually the one I am most interested in.

Is that an expected behaviour?

Thanks,
Ana


import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

public class WordCountExample {
    static Logger logger = LoggerFactory.getLogger(WordCountExample.class);

    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        logger.info("Entering application.");

    DataSet<String> text = env.fromElements(
                "Who's there?",
                "I think I hear them. Stand, ho! Who's there?");

        List<Integer> elements = new ArrayList<Integer>();
        elements.add(0);


        DataSet<TestClass> set = env.fromElements(new TestClass(elements));

        DataSet<Tuple2<String, Integer>> wordCounts = text
                .flatMap(new LineSplitter())
                .withBroadcastSet(set, "set")
                .groupBy(0)
                .sum(1);

        wordCounts.print();


    }

    public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {

        static Logger loggerLineSplitter = LoggerFactory.getLogger(LineSplitter.class);

        @Override
        public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
            loggerLineSplitter.info("Logger in LineSplitter.flatMap");
            for (String word : line.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }

    public static class TestClass implements Serializable {
        private static final long serialVersionUID = -2932037991574118651L;

        static Logger loggerTestClass = LoggerFactory.getLogger("WordCountExample.TestClass");

        List<Integer> integerList;
        public TestClass(List<Integer> integerList){
            this.integerList=integerList;
            loggerTestClass.info("Logger in TestClass");
        }


    }
}



On 17 Dec 2015, at 16:08, Till Rohrmann <tr...@apache.org>> wrote:

Hi Ana,

you can simply modify the `log4j.properties` file in the `conf` directory. It should be automatically included in the Yarn application.

Concerning your logging problem, it might be that you have set the logging level too high. Could you share the code with us?

Cheers,
Till

On Thu, Dec 17, 2015 at 1:56 PM, Ana M. Martinez <an...@cs.aau.dk>> wrote:
Hi flink community,

I am trying to show log messages using log4j.
It works fine overall except for the messages I want to show in an inner class that implements org.apache.flink.api.common.aggregators.ConvergenceCriterion.
I am very new to this, but it seems that I’m having problems to show the messages included in the isConverged function, as it runs in the task managers?
E.g. the log messages in the outer class (before map-reduce operations) are properly shown.

I am also interested in providing my own log4j.properties file. I am using the ./bin/flink run -m yarn-cluster on Amazon clusters.

Thanks,
Ana







Re: Problem to show logs in task managers

Posted by Till Rohrmann <tr...@apache.org>.
I think the YARN application has to be finished in order for the logs to be
accessible.

Judging from you commands, you’re starting a long running YARN application
running Flink with ./bin/yarn-session.sh -n 1 -tm 2048 -s 4. This cluster
won’t be used though, because you’re executing your job with ./bin/flink
run -m yarn-cluster which will start another YARN application which is only
alive as long as the Flink job is executed. If you want to run your job on
the long running YARN application, then you simply have to omit -m
yarn-cluster.

Cheers,
Till
​

On Mon, Jan 4, 2016 at 12:36 PM, Ana M. Martinez <an...@cs.aau.dk> wrote:

> Hi Till,
>
> Sorry for the delay (Xmas break). I have activated log aggregation
> on flink-conf.yaml with yarn.log-aggregation-enable: true (as I can’t find
> a yarn-site.xml).
> But the command yarn logs -applicationId application_1451903796996_0008
> gives me the following output:
>
> INFO client.RMProxy: Connecting to ResourceManager at xxx
> /var/log/hadoop-yarn/apps/hadoop/logs/application_1451903796996_0008does
> not exist.
> Log aggregation has not completed or is not enabled
>
>
> I’ve tried to restart the Flink JobManager and TaskManagers as follows:
> ./bin/yarn-session.sh -n 1 -tm 2048 -s 4
> and then with a detached screen, run my application with ./bin/flink run
> -m yarn-cluster ...
>
> I am not sure if my problem is that I am not setting the
> log-aggregation-enable property well or I am not restarting the Flink
> JobManager and TaskManagers as I should… Any idea?
>
> Thanks,
> Ana
>
> On 18 Dec 2015, at 16:29, Till Rohrmann <tr...@apache.org> wrote:
>
> In which log file are you exactly looking for the logging statements? And
> on what machine? You have to look on the machines on which the yarn
> container were started. Alternatively if you have log aggregation
> activated, then you can simply retrieve the log files via yarn logs.
>
> Cheers,
> Till
>
> On Fri, Dec 18, 2015 at 3:49 PM, Ana M. Martinez <an...@cs.aau.dk> wrote:
>
>> Hi Till,
>>
>> Many thanks for your quick response.
>>
>> I have modified the WordCountExample to re-reproduce my problem in a
>> simple example.
>>
>> I run the code below with the following command:
>> ./bin/flink run -m yarn-cluster -yn 1 -ys 4 -yjm 1024 -ytm 1024 -c
>> mypackage.WordCountExample ../flinklink.jar
>>
>> And if I check the log file I see all logger messages except the one in
>> the flatMap function of the inner LineSplitter class, which is actually the
>> one I am most interested in.
>>
>> Is that an expected behaviour?
>>
>> Thanks,
>> Ana
>>
>> import org.apache.flink.api.common.functions.FlatMapFunction;
>> import org.apache.flink.api.java.DataSet;
>> import org.apache.flink.api.java.ExecutionEnvironment;
>> import org.apache.flink.api.java.tuple.Tuple2;
>> import org.apache.flink.util.Collector;
>> import org.slf4j.Logger;
>> import org.slf4j.LoggerFactory;
>>
>> import java.io.Serializable;
>> import java.util.ArrayList;
>> import java.util.List;
>>
>> public class WordCountExample {
>>     static Logger logger = LoggerFactory.getLogger(WordCountExample.class);
>>
>>     public static void main(String[] args) throws Exception {
>>         final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
>>
>>         logger.info("Entering application.");
>>
>>     DataSet<String> text = env.fromElements(
>>                 "Who's there?",
>>                 "I think I hear them. Stand, ho! Who's there?");
>>
>>         List<Integer> elements = new ArrayList<Integer>();
>>         elements.add(0);
>>
>>
>>         DataSet<TestClass> set = env.fromElements(new TestClass(elements));
>>
>>         DataSet<Tuple2<String, Integer>> wordCounts = text
>>                 .flatMap(new LineSplitter())
>>                 .withBroadcastSet(set, "set")
>>                 .groupBy(0)
>>                 .sum(1);
>>
>>         wordCounts.print();
>>
>>
>>     }
>>
>>     public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
>>
>>         static Logger loggerLineSplitter = LoggerFactory.getLogger(LineSplitter.class);
>>
>>         @Override
>>         public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
>>             loggerLineSplitter.info("Logger in LineSplitter.flatMap");
>>             for (String word : line.split(" ")) {
>>                 out.collect(new Tuple2<String, Integer>(word, 1));
>>             }
>>         }
>>     }
>>
>>     public static class TestClass implements Serializable {
>>         private static final long serialVersionUID = -2932037991574118651L;
>>
>>         static Logger loggerTestClass = LoggerFactory.getLogger("WordCountExample.TestClass");
>>
>>         List<Integer> integerList;
>>         public TestClass(List<Integer> integerList){
>>             this.integerList=integerList;
>>             loggerTestClass.info("Logger in TestClass");
>>         }
>>
>>
>>     }
>> }
>>
>>
>>
>>
>> On 17 Dec 2015, at 16:08, Till Rohrmann <tr...@apache.org> wrote:
>>
>> Hi Ana,
>>
>> you can simply modify the `log4j.properties` file in the `conf`
>> directory. It should be automatically included in the Yarn application.
>>
>> Concerning your logging problem, it might be that you have set the
>> logging level too high. Could you share the code with us?
>>
>> Cheers,
>> Till
>>
>> On Thu, Dec 17, 2015 at 1:56 PM, Ana M. Martinez <an...@cs.aau.dk> wrote:
>>
>>> Hi flink community,
>>>
>>> I am trying to show log messages using log4j.
>>> It works fine overall except for the messages I want to show in an inner
>>> class that implements
>>> org.apache.flink.api.common.aggregators.ConvergenceCriterion.
>>> I am very new to this, but it seems that I’m having problems to show the
>>> messages included in the isConverged function, as it runs in the task
>>> managers?
>>> E.g. the log messages in the outer class (before map-reduce operations)
>>> are properly shown.
>>>
>>> I am also interested in providing my own log4j.properties file. I am
>>> using the ./bin/flink run -m yarn-cluster on Amazon clusters.
>>>
>>> Thanks,
>>> Ana
>>
>>
>>
>>
>
>

Re: Problem to show logs in task managers

Posted by "Ana M. Martinez" <an...@cs.aau.dk>.
Hi Till,

Sorry for the delay (Xmas break). I have activated log aggregation on flink-conf.yaml with yarn.log-aggregation-enable: true (as I can’t find a yarn-site.xml).
But the command yarn logs -applicationId application_1451903796996_0008 gives me the following output:

INFO client.RMProxy: Connecting to ResourceManager at xxx
/var/log/hadoop-yarn/apps/hadoop/logs/application_1451903796996_0008does not exist.
Log aggregation has not completed or is not enabled

I’ve tried to restart the Flink JobManager and TaskManagers as follows:
./bin/yarn-session.sh -n 1 -tm 2048 -s 4
and then with a detached screen, run my application with ./bin/flink run -m yarn-cluster ...

I am not sure if my problem is that I am not setting the log-aggregation-enable property well or I am not restarting the Flink JobManager and TaskManagers as I should… Any idea?

Thanks,
Ana

On 18 Dec 2015, at 16:29, Till Rohrmann <tr...@apache.org>> wrote:

In which log file are you exactly looking for the logging statements? And on what machine? You have to look on the machines on which the yarn container were started. Alternatively if you have log aggregation activated, then you can simply retrieve the log files via yarn logs.

Cheers,
Till

On Fri, Dec 18, 2015 at 3:49 PM, Ana M. Martinez <an...@cs.aau.dk>> wrote:
Hi Till,

Many thanks for your quick response.

I have modified the WordCountExample to re-reproduce my problem in a simple example.

I run the code below with the following command:
./bin/flink run -m yarn-cluster -yn 1 -ys 4 -yjm 1024 -ytm 1024 -c mypackage.WordCountExample ../flinklink.jar

And if I check the log file I see all logger messages except the one in the flatMap function of the inner LineSplitter class, which is actually the one I am most interested in.

Is that an expected behaviour?

Thanks,
Ana


import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

public class WordCountExample {
    static Logger logger = LoggerFactory.getLogger(WordCountExample.class);

    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        logger.info("Entering application.");

    DataSet<String> text = env.fromElements(
                "Who's there?",
                "I think I hear them. Stand, ho! Who's there?");

        List<Integer> elements = new ArrayList<Integer>();
        elements.add(0);


        DataSet<TestClass> set = env.fromElements(new TestClass(elements));

        DataSet<Tuple2<String, Integer>> wordCounts = text
                .flatMap(new LineSplitter())
                .withBroadcastSet(set, "set")
                .groupBy(0)
                .sum(1);

        wordCounts.print();


    }

    public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {

        static Logger loggerLineSplitter = LoggerFactory.getLogger(LineSplitter.class);

        @Override
        public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
            loggerLineSplitter.info("Logger in LineSplitter.flatMap");
            for (String word : line.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }

    public static class TestClass implements Serializable {
        private static final long serialVersionUID = -2932037991574118651L;

        static Logger loggerTestClass = LoggerFactory.getLogger("WordCountExample.TestClass");

        List<Integer> integerList;
        public TestClass(List<Integer> integerList){
            this.integerList=integerList;
            loggerTestClass.info("Logger in TestClass");
        }


    }
}



On 17 Dec 2015, at 16:08, Till Rohrmann <tr...@apache.org>> wrote:

Hi Ana,

you can simply modify the `log4j.properties` file in the `conf` directory. It should be automatically included in the Yarn application.

Concerning your logging problem, it might be that you have set the logging level too high. Could you share the code with us?

Cheers,
Till

On Thu, Dec 17, 2015 at 1:56 PM, Ana M. Martinez <an...@cs.aau.dk>> wrote:
Hi flink community,

I am trying to show log messages using log4j.
It works fine overall except for the messages I want to show in an inner class that implements org.apache.flink.api.common.aggregators.ConvergenceCriterion.
I am very new to this, but it seems that I’m having problems to show the messages included in the isConverged function, as it runs in the task managers?
E.g. the log messages in the outer class (before map-reduce operations) are properly shown.

I am also interested in providing my own log4j.properties file. I am using the ./bin/flink run -m yarn-cluster on Amazon clusters.

Thanks,
Ana





Re: Problem to show logs in task managers

Posted by Till Rohrmann <tr...@apache.org>.
In which log file are you exactly looking for the logging statements? And
on what machine? You have to look on the machines on which the yarn
container were started. Alternatively if you have log aggregation
activated, then you can simply retrieve the log files via yarn logs.

Cheers,
Till

On Fri, Dec 18, 2015 at 3:49 PM, Ana M. Martinez <an...@cs.aau.dk> wrote:

> Hi Till,
>
> Many thanks for your quick response.
>
> I have modified the WordCountExample to re-reproduce my problem in a
> simple example.
>
> I run the code below with the following command:
> ./bin/flink run -m yarn-cluster -yn 1 -ys 4 -yjm 1024 -ytm 1024 -c
> mypackage.WordCountExample ../flinklink.jar
>
> And if I check the log file I see all logger messages except the one in
> the flatMap function of the inner LineSplitter class, which is actually the
> one I am most interested in.
>
> Is that an expected behaviour?
>
> Thanks,
> Ana
>
> import org.apache.flink.api.common.functions.FlatMapFunction;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.util.Collector;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> import java.io.Serializable;
> import java.util.ArrayList;
> import java.util.List;
>
> public class WordCountExample {
>     static Logger logger = LoggerFactory.getLogger(WordCountExample.class);
>
>     public static void main(String[] args) throws Exception {
>         final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
>
>         logger.info("Entering application.");
>
>     DataSet<String> text = env.fromElements(
>                 "Who's there?",
>                 "I think I hear them. Stand, ho! Who's there?");
>
>         List<Integer> elements = new ArrayList<Integer>();
>         elements.add(0);
>
>
>         DataSet<TestClass> set = env.fromElements(new TestClass(elements));
>
>         DataSet<Tuple2<String, Integer>> wordCounts = text
>                 .flatMap(new LineSplitter())
>                 .withBroadcastSet(set, "set")
>                 .groupBy(0)
>                 .sum(1);
>
>         wordCounts.print();
>
>
>     }
>
>     public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
>
>         static Logger loggerLineSplitter = LoggerFactory.getLogger(LineSplitter.class);
>
>         @Override
>         public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
>             loggerLineSplitter.info("Logger in LineSplitter.flatMap");
>             for (String word : line.split(" ")) {
>                 out.collect(new Tuple2<String, Integer>(word, 1));
>             }
>         }
>     }
>
>     public static class TestClass implements Serializable {
>         private static final long serialVersionUID = -2932037991574118651L;
>
>         static Logger loggerTestClass = LoggerFactory.getLogger("WordCountExample.TestClass");
>
>         List<Integer> integerList;
>         public TestClass(List<Integer> integerList){
>             this.integerList=integerList;
>             loggerTestClass.info("Logger in TestClass");
>         }
>
>
>     }
> }
>
>
>
>
> On 17 Dec 2015, at 16:08, Till Rohrmann <tr...@apache.org> wrote:
>
> Hi Ana,
>
> you can simply modify the `log4j.properties` file in the `conf` directory.
> It should be automatically included in the Yarn application.
>
> Concerning your logging problem, it might be that you have set the logging
> level too high. Could you share the code with us?
>
> Cheers,
> Till
>
> On Thu, Dec 17, 2015 at 1:56 PM, Ana M. Martinez <an...@cs.aau.dk> wrote:
>
>> Hi flink community,
>>
>> I am trying to show log messages using log4j.
>> It works fine overall except for the messages I want to show in an inner
>> class that implements
>> org.apache.flink.api.common.aggregators.ConvergenceCriterion.
>> I am very new to this, but it seems that I’m having problems to show the
>> messages included in the isConverged function, as it runs in the task
>> managers?
>> E.g. the log messages in the outer class (before map-reduce operations)
>> are properly shown.
>>
>> I am also interested in providing my own log4j.properties file. I am
>> using the ./bin/flink run -m yarn-cluster on Amazon clusters.
>>
>> Thanks,
>> Ana
>
>
>
>

Re: Problem to show logs in task managers

Posted by "Ana M. Martinez" <an...@cs.aau.dk>.
Hi Till,

Many thanks for your quick response.

I have modified the WordCountExample to re-reproduce my problem in a simple example.

I run the code below with the following command:
./bin/flink run -m yarn-cluster -yn 1 -ys 4 -yjm 1024 -ytm 1024 -c mypackage.WordCountExample ../flinklink.jar

And if I check the log file I see all logger messages except the one in the flatMap function of the inner LineSplitter class, which is actually the one I am most interested in.

Is that an expected behaviour?

Thanks,
Ana


import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

public class WordCountExample {
    static Logger logger = LoggerFactory.getLogger(WordCountExample.class);

    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        logger.info("Entering application.");

    DataSet<String> text = env.fromElements(
                "Who's there?",
                "I think I hear them. Stand, ho! Who's there?");

        List<Integer> elements = new ArrayList<Integer>();
        elements.add(0);


        DataSet<TestClass> set = env.fromElements(new TestClass(elements));

        DataSet<Tuple2<String, Integer>> wordCounts = text
                .flatMap(new LineSplitter())
                .withBroadcastSet(set, "set")
                .groupBy(0)
                .sum(1);

        wordCounts.print();


    }

    public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {

        static Logger loggerLineSplitter = LoggerFactory.getLogger(LineSplitter.class);

        @Override
        public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
            loggerLineSplitter.info("Logger in LineSplitter.flatMap");
            for (String word : line.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }

    public static class TestClass implements Serializable {
        private static final long serialVersionUID = -2932037991574118651L;

        static Logger loggerTestClass = LoggerFactory.getLogger("WordCountExample.TestClass");

        List<Integer> integerList;
        public TestClass(List<Integer> integerList){
            this.integerList=integerList;
            loggerTestClass.info("Logger in TestClass");
        }


    }
}



On 17 Dec 2015, at 16:08, Till Rohrmann <tr...@apache.org>> wrote:

Hi Ana,

you can simply modify the `log4j.properties` file in the `conf` directory. It should be automatically included in the Yarn application.

Concerning your logging problem, it might be that you have set the logging level too high. Could you share the code with us?

Cheers,
Till

On Thu, Dec 17, 2015 at 1:56 PM, Ana M. Martinez <an...@cs.aau.dk>> wrote:
Hi flink community,

I am trying to show log messages using log4j.
It works fine overall except for the messages I want to show in an inner class that implements org.apache.flink.api.common.aggregators.ConvergenceCriterion.
I am very new to this, but it seems that I’m having problems to show the messages included in the isConverged function, as it runs in the task managers?
E.g. the log messages in the outer class (before map-reduce operations) are properly shown.

I am also interested in providing my own log4j.properties file. I am using the ./bin/flink run -m yarn-cluster on Amazon clusters.

Thanks,
Ana



Re: Problem to show logs in task managers

Posted by Till Rohrmann <tr...@apache.org>.
Hi Ana,

you can simply modify the `log4j.properties` file in the `conf` directory.
It should be automatically included in the Yarn application.

Concerning your logging problem, it might be that you have set the logging
level too high. Could you share the code with us?

Cheers,
Till

On Thu, Dec 17, 2015 at 1:56 PM, Ana M. Martinez <an...@cs.aau.dk> wrote:

> Hi flink community,
>
> I am trying to show log messages using log4j.
> It works fine overall except for the messages I want to show in an inner
> class that implements
> org.apache.flink.api.common.aggregators.ConvergenceCriterion.
> I am very new to this, but it seems that I’m having problems to show the
> messages included in the isConverged function, as it runs in the task
> managers?
> E.g. the log messages in the outer class (before map-reduce operations)
> are properly shown.
>
> I am also interested in providing my own log4j.properties file. I am using
> the ./bin/flink run -m yarn-cluster on Amazon clusters.
>
> Thanks,
> Ana