You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Conlin, Joshua [USA]" <co...@bah.com> on 2016/04/26 20:16:26 UTC

Consuming Messages from Kafka

Hello,

I am new to Flink and trying to learn this framework.  Seems great so far.  I am trying to translate my existing storm Topology to a Flink job and I am having trouble consuming data from Kafka.  Here's what my Job looks like:


public static void main(String[] args) throws Exception {

Properties properties = new Properties();

properties.setProperty("bootstrap.servers", "hostname:port");


properties.setProperty("group.id", "stream-test");

properties.setProperty("client.id", "test-flink");

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource<String> kafkaStream = env.addSource(new FlinkKafkaConsumer09<>("test", new SimpleStringSchema(), properties));


kafkaStream.addSink(new StringLogSink());

env.execute();


}


There are messages being sent to Kafka on that topic, I just never see anything in Flink.  Any help/insight you could provide would be greatly appreciated.  If it makes a difference this is running on YARN.  Also, here's what I see in the logs:


2016-04-26 18:02:38,707 INFO  org.apache.kafka.common.utils.AppInfoParser                   - Kafka version : 0.9.0.1
2016-04-26 18:02:38,707 INFO  org.apache.kafka.common.utils.AppInfoParser                   - Kafka commitId : 23c69d62a0cabf06
2016-04-26 18:02:38,708 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09  - Trying to get partitions for topic test
2016-04-26 18:02:38,854 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09  - Got 1 partitions from these topics: [test]
2016-04-26 18:02:38,854 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Consumer is going to read the following topics (with number of partitions): test (1),
2016-04-26 18:02:38,933 INFO  org.apache.flink.yarn.YarnJobManager                          - Submitting job 0ab4248d8917e707a8f297420e4c564d ().
2016-04-26 18:02:38,934 INFO  org.apache.flink.yarn.YarnJobManager                          - Using restart strategy NoRestartStrategy for 0ab4248d8917e707a8f297420e4c564d.
2016-04-26 18:02:38,935 INFO  org.apache.flink.yarn.YarnJobManager                          - Scheduling job 0ab4248d8917e707a8f297420e4c564d ().
2016-04-26 18:02:38,935 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source -> Sink: Unnamed (1/1) (9bd0abdf59a1af934527257a990910ff) switched from CREATED to SCHEDULED
2016-04-26 18:02:38,935 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source -> Sink: Unnamed (1/1) (9bd0abdf59a1af934527257a990910ff) switched from SCHEDULED to DEPLOYING
2016-04-26 18:02:38,935 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying Source: Custom Source -> Sink: Unnamed (1/1) (attempt #0) to ip-10-167-233-231
2016-04-26 18:02:38,936 INFO  org.apache.flink.yarn.YarnJobManager                          - Status of job 0ab4248d8917e707a8f297420e4c564d () changed to RUNNING.
2016-04-26 18:02:39,151 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source -> Sink: Unnamed (1/1) (9bd0abdf59a1af934527257a990910ff) switched from DEPLOYING to RUNNING

Thanks,

Josh

Re: [External] Re: Consuming Messages from Kafka

Posted by Robert Metzger <rm...@apache.org>.
Hi Josh,

The JobManager log won't contain this output.

Check out these slides I did a while ago, they explain how you can retrieve
the logs from the TaskManagers:
http://www.slideshare.net/robertmetzger1/apache-flink-hands-on#14



On Tue, Apr 26, 2016 at 9:41 PM, Conlin, Joshua [USA] <conlin_joshua@bah.com
> wrote:

> “StringLogSink” just looks like:
>
> System.out.println(msg);
>
> LOG.info("Logging message: " + msg);
>
>
> And LOG is from slf4j.  In the Flink UI that is running on Yarn, I see no
> counts, nor log statements or stdout under JobManager.  It seems to make no
> difference if I submit the job through yarn via command line or the Flink
> UI session already running under yarn.  Where would you recommend I look in
> the Yarn containers?
>
>
> Thanks again for your help.
>
>
> Josh
>
> From: Robert Metzger <rm...@apache.org>
> Reply-To: "user@flink.apache.org" <us...@flink.apache.org>
> Date: Tuesday, April 26, 2016 at 3:30 PM
> To: "user@flink.apache.org" <us...@flink.apache.org>
> Subject: [External] Re: Consuming Messages from Kafka
>
> Hi,
>
> the web interface is a good idea for checking if everything is working as
> expected. However in this case I expect the counts for the task be 0
> because the source and sink are chained together into one task (upcoming
> Flink releases will fix this behavior).
>
> I assume the "StringLogSink" is logging all incoming events. How do you do
> that? Using slf4j ? our by System.out.println?
> I'm asking to make sure you're looking at the right place to capture the
> output. It will be at the YARN containers.
>
> Regards,
> Robert
>
>
> On Tue, Apr 26, 2016 at 8:34 PM, Dominik Choma <do...@gmail.com>
> wrote:
>
>> Hi,
>>
>> You can check if any messages are going through dataflow on flink web
>> dashboard
>> https://flink.apache.org/img/blog/new-dashboard-screenshot.png
>>
>>
>>
>> Dominik Choma
>>
>> Wiadomość napisana przez Conlin, Joshua [USA] <co...@bah.com> w
>> dniu 26 kwi 2016, o godz. 20:16:
>>
>> re messages being sent to Kafka on that topic, I just never see anything
>> in Flink.  Any help/insight you could provide would be greatly
>> appreciated.  If it makes a difference this is running on YARN.  Also,
>> here’s what I see in the logs:
>>
>>
>>
>

Re: [External] Re: Consuming Messages from Kafka

Posted by "Conlin, Joshua [USA]" <co...@bah.com>.
"StringLogSink" just looks like:


System.out.println(msg);

LOG.info("Logging message: " + msg);


And LOG is from slf4j.  In the Flink UI that is running on Yarn, I see no counts, nor log statements or stdout under JobManager.  It seems to make no difference if I submit the job through yarn via command line or the Flink UI session already running under yarn.  Where would you recommend I look in the Yarn containers?


Thanks again for your help.


Josh

From: Robert Metzger <rm...@apache.org>>
Reply-To: "user@flink.apache.org<ma...@flink.apache.org>" <us...@flink.apache.org>>
Date: Tuesday, April 26, 2016 at 3:30 PM
To: "user@flink.apache.org<ma...@flink.apache.org>" <us...@flink.apache.org>>
Subject: [External] Re: Consuming Messages from Kafka

Hi,

the web interface is a good idea for checking if everything is working as expected. However in this case I expect the counts for the task be 0 because the source and sink are chained together into one task (upcoming Flink releases will fix this behavior).

I assume the "StringLogSink" is logging all incoming events. How do you do that? Using slf4j ? our by System.out.println?
I'm asking to make sure you're looking at the right place to capture the output. It will be at the YARN containers.

Regards,
Robert


On Tue, Apr 26, 2016 at 8:34 PM, Dominik Choma <do...@gmail.com>> wrote:
Hi,

You can check if any messages are going through dataflow on flink web dashboard
https://flink.apache.org/img/blog/new-dashboard-screenshot.png



Dominik Choma

Wiadomość napisana przez Conlin, Joshua [USA] <co...@bah.com>> w dniu 26 kwi 2016, o godz. 20:16:

re messages being sent to Kafka on that topic, I just never see anything in Flink.  Any help/insight you could provide would be greatly appreciated.  If it makes a difference this is running on YARN.  Also, here's what I see in the logs:



Re: Consuming Messages from Kafka

Posted by Robert Metzger <rm...@apache.org>.
Hi,

the web interface is a good idea for checking if everything is working as
expected. However in this case I expect the counts for the task be 0
because the source and sink are chained together into one task (upcoming
Flink releases will fix this behavior).

I assume the "StringLogSink" is logging all incoming events. How do you do
that? Using slf4j ? our by System.out.println?
I'm asking to make sure you're looking at the right place to capture the
output. It will be at the YARN containers.

Regards,
Robert


On Tue, Apr 26, 2016 at 8:34 PM, Dominik Choma <do...@gmail.com>
wrote:

> Hi,
>
> You can check if any messages are going through dataflow on flink web
> dashboard
> https://flink.apache.org/img/blog/new-dashboard-screenshot.png
>
>
>
> Dominik Choma
>
> Wiadomość napisana przez Conlin, Joshua [USA] <co...@bah.com> w
> dniu 26 kwi 2016, o godz. 20:16:
>
> re messages being sent to Kafka on that topic, I just never see anything
> in Flink.  Any help/insight you could provide would be greatly
> appreciated.  If it makes a difference this is running on YARN.  Also,
> here’s what I see in the logs:
>
>
>

Re: Consuming Messages from Kafka

Posted by Dominik Choma <do...@gmail.com>.
Hi,

You can check if any messages are going through dataflow on flink web dashboard
https://flink.apache.org/img/blog/new-dashboard-screenshot.png <https://flink.apache.org/img/blog/new-dashboard-screenshot.png>



Dominik Choma

> Wiadomość napisana przez Conlin, Joshua [USA] <co...@bah.com> w dniu 26 kwi 2016, o godz. 20:16:
> 
> re messages being sent to Kafka on that topic, I just never see anything in Flink.  Any help/insight you could provide would be greatly appreciated.  If it makes a difference this is running on YARN.  Also, here’s what I see in the logs: