You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Govindarajan Srinivasaraghavan <go...@gmail.com> on 2016/10/03 06:08:20 UTC

Using Flink

Hi,



I have few questions on how I need to model my use case in flink. Please
advise. Thanks for the help.



- I'm currently running my flink program on 1.2 SNAPSHOT with kafka source
and I have checkpoint enabled. When I look at the consumer offsets in kafka
it appears to be stagnant and there is a huge lag. But I can see my flink
program is in pace with kafka source in JMX metrics and outputs. Is there a
way to identify why the offsets are not committed to kafka?



- In my current application we custom loggers for debugging purposes. Let’s
say we want to find what’s happening for a particular user, we fire an api
request to add the custom logger for that particular user and use it for
logging along the data path. Is there a way to achieve this in flink? Are
there any global mutable parameters that I can use to achieve this
functionality?



- Can I pass on state between operators? If I need the state stored on
previous operators, how can I fetch it?



Thanks

Re: Using Flink

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi Govindarajan,

Regarding the stagnant Kakfa offsets, it’ll be helpful if you can supply more information for the following to help us identify the cause:
1. What is your checkpointing interval set to?
2. Did you happen to have set the “max.partition.fetch.bytes” property in the properties given to FlinkKafkaConsumer? I’m suspecting with some recent changes to the offset committing, large fetches can also affect when offsets are committed to Kafka.
3. I’m assuming that you’ve built the Kafka connector from source. Could you tell which commit it was built on?

If you could, you can also reply with the taskmanager logs (or via private email) so we can check in detail, that would definitely be helpful!

Best Regards,
Gordon


On October 4, 2016 at 3:51:59 PM, Till Rohrmann (trohrmann@apache.org) wrote:

Hi Govindarajan,

you can broadcast the stream with debug logger information by calling `stream.broadcast`. Then every stream record should be send to all sub-tasks of the downstream operator.

Cheers,
Till

On Mon, Oct 3, 2016 at 5:13 PM, Govindarajan Srinivasaraghavan <go...@gmail.com> wrote:
Hi Gordon,

- I'm currently running my flink program on 1.2 SNAPSHOT with kafka source and I have checkpoint enabled. When I look at the consumer offsets in kafka it appears to be stagnant and there is a huge lag. But I can see my flink program is in pace with kafka source in JMX metrics and outputs. Is there a way to identify why the offsets are not committed to kafka?

On which commit was your Kafka connector built? There was a recent change to the offset committing for Kafka 0.9 consumer, so identifying the exact commit will help clarify whether the recent change introduced any new problems. Also, what is your configured checkpoint interval? When checkpointing is enabled, the Kafka consumer only commits to Kafka when checkpoints are completed. So, offsets in Kafka are not updated until the next checkpoint is triggered.

I am using 1.2-SNAPSHOT
'org.apache.flink', name: 'flink-connector-kafka-base_2.11', version: '1.2-SNAPSHOT'
'org.apache.flink', name: 'flink-connector-kafka-0.9_2.11', version: '1.2-SNAPSHOT'

- In my current application we custom loggers for debugging purposes. Let’s say we want to find what’s happening for a particular user, we fire an api request to add the custom logger for that particular user and use it for logging along the data path. Is there a way to achieve this in flink? Are there any global mutable parameters that I can use to achieve this functionality?

I’m not sure if I understand the use case correctly, but it seems like you will need to change configuration / behaviour of a specific Flink operator at runtime. To my knowledge, the best way to do this in Flink right now is to translate your original logging-trigger api requests to a stream of events fed to Flink. This stream of events will then basically be changes of your user logger behaviour, and your operators can change its logging behaviour according to this stream.

I can send the changes as streams, but I need this change for all the operators in my pipeline. Instead of using coflatmap at each operator to combine the streams, is there a way to send a change to all the operators?

- Can I pass on state between operators? If I need the state stored on previous operators, how can I fetch it?
I don’t think this is possible.
Fine, thanks.

Thanks.

On Mon, Oct 3, 2016 at 12:23 AM, Tzu-Li (Gordon) Tai <tz...@apache.org> wrote:
Hi!

- I'm currently running my flink program on 1.2 SNAPSHOT with kafka source and I have checkpoint enabled. When I look at the consumer offsets in kafka it appears to be stagnant and there is a huge lag. But I can see my flink program is in pace with kafka source in JMX metrics and outputs. Is there a way to identify why the offsets are not committed to kafka?

On which commit was your Kafka connector built? There was a recent change to the offset committing for Kafka 0.9 consumer, so identifying the exact commit will help clarify whether the recent change introduced any new problems. Also, what is your configured checkpoint interval? When checkpointing is enabled, the Kafka consumer only commits to Kafka when checkpoints are completed. So, offsets in Kafka are not updated until the next checkpoint is triggered.

- In my current application we custom loggers for debugging purposes. Let’s say we want to find what’s happening for a particular user, we fire an api request to add the custom logger for that particular user and use it for logging along the data path. Is there a way to achieve this in flink? Are there any global mutable parameters that I can use to achieve this functionality?

I’m not sure if I understand the use case correctly, but it seems like you will need to change configuration / behaviour of a specific Flink operator at runtime. To my knowledge, the best way to do this in Flink right now is to translate your original logging-trigger api requests to a stream of events fed to Flink. This stream of events will then basically be changes of your user logger behaviour, and your operators can change its logging behaviour according to this stream.

- Can I pass on state between operators? If I need the state stored on previous operators, how can I fetch it?

I don’t think this is possible.


Best Regards,
Gordon


On October 3, 2016 at 2:08:31 PM, Govindarajan Srinivasaraghavan (govindraghvan@gmail.com) wrote:

Hi,

 

I have few questions on how I need to model my use case in flink. Please advise. Thanks for the help.

 

- I'm currently running my flink program on 1.2 SNAPSHOT with kafka source and I have checkpoint enabled. When I look at the consumer offsets in kafka it appears to be stagnant and there is a huge lag. But I can see my flink program is in pace with kafka source in JMX metrics and outputs. Is there a way to identify why the offsets are not committed to kafka?

 

- In my current application we custom loggers for debugging purposes. Let’s say we want to find what’s happening for a particular user, we fire an api request to add the custom logger for that particular user and use it for logging along the data path. Is there a way to achieve this in flink? Are there any global mutable parameters that I can use to achieve this functionality?

 

- Can I pass on state between operators? If I need the state stored on previous operators, how can I fetch it?

 

Thanks




Re: Using Flink

Posted by Till Rohrmann <tr...@apache.org>.
Hi Govindarajan,

you can broadcast the stream with debug logger information by calling
`stream.broadcast`. Then every stream record should be send to all
sub-tasks of the downstream operator.

Cheers,
Till

On Mon, Oct 3, 2016 at 5:13 PM, Govindarajan Srinivasaraghavan <
govindraghvan@gmail.com> wrote:

> Hi Gordon,
>
> - I'm currently running my flink program on 1.2 SNAPSHOT with kafka source
> and I have checkpoint enabled. When I look at the consumer offsets in kafka
> it appears to be stagnant and there is a huge lag. But I can see my flink
> program is in pace with kafka source in JMX metrics and outputs. Is there a
> way to identify why the offsets are not committed to kafka?
>
> On which commit was your Kafka connector built? There was a recent change
> to the offset committing for Kafka 0.9 consumer, so identifying the exact
> commit will help clarify whether the recent change introduced any new
> problems. Also, what is your configured checkpoint interval? When
> checkpointing is enabled, the Kafka consumer only commits to Kafka when
> checkpoints are completed. So, offsets in Kafka are not updated until the
> next checkpoint is triggered.
>
> I am using 1.2-SNAPSHOT
> 'org.apache.flink', name: 'flink-connector-kafka-base_2.11', version:
> '1.2-SNAPSHOT'
> 'org.apache.flink', name: 'flink-connector-kafka-0.9_2.11', version:
> '1.2-SNAPSHOT'
>
> - In my current application we custom loggers for debugging purposes.
> Let’s say we want to find what’s happening for a particular user, we fire
> an api request to add the custom logger for that particular user and use it
> for logging along the data path. Is there a way to achieve this in flink?
> Are there any global mutable parameters that I can use to achieve this
> functionality?
>
> I’m not sure if I understand the use case correctly, but it seems like you
> will need to change configuration / behaviour of a specific Flink operator
> at runtime. To my knowledge, the best way to do this in Flink right now is
> to translate your original logging-trigger api requests to a stream of
> events fed to Flink. This stream of events will then basically be changes
> of your user logger behaviour, and your operators can change its logging
> behaviour according to this stream.
>
> I can send the changes as streams, but I need this change for all the
> operators in my pipeline. Instead of using coflatmap at each operator to
> combine the streams, is there a way to send a change to all the operators?
>
> - Can I pass on state between operators? If I need the state stored on
> previous operators, how can I fetch it?
> I don’t think this is possible.
> Fine, thanks.
>
> Thanks.
>
> On Mon, Oct 3, 2016 at 12:23 AM, Tzu-Li (Gordon) Tai <tz...@apache.org>
> wrote:
>
>> Hi!
>>
>> - I'm currently running my flink program on 1.2 SNAPSHOT with kafka
>> source and I have checkpoint enabled. When I look at the consumer offsets
>> in kafka it appears to be stagnant and there is a huge lag. But I can see
>> my flink program is in pace with kafka source in JMX metrics and outputs.
>> Is there a way to identify why the offsets are not committed to kafka?
>>
>> On which commit was your Kafka connector built? There was a recent change
>> to the offset committing for Kafka 0.9 consumer, so identifying the exact
>> commit will help clarify whether the recent change introduced any new
>> problems. Also, what is your configured checkpoint interval? When
>> checkpointing is enabled, the Kafka consumer only commits to Kafka when
>> checkpoints are completed. So, offsets in Kafka are not updated until the
>> next checkpoint is triggered.
>>
>> - In my current application we custom loggers for debugging purposes.
>> Let’s say we want to find what’s happening for a particular user, we fire
>> an api request to add the custom logger for that particular user and use it
>> for logging along the data path. Is there a way to achieve this in flink?
>> Are there any global mutable parameters that I can use to achieve this
>> functionality?
>>
>> I’m not sure if I understand the use case correctly, but it seems like
>> you will need to change configuration / behaviour of a specific Flink
>> operator at runtime. To my knowledge, the best way to do this in Flink
>> right now is to translate your original logging-trigger api requests to
>> a stream of events fed to Flink. This stream of events will then basically
>> be changes of your user logger behaviour, and your operators can change its
>> logging behaviour according to this stream.
>>
>> - Can I pass on state between operators? If I need the state stored on
>> previous operators, how can I fetch it?
>>
>> I don’t think this is possible.
>>
>>
>> Best Regards,
>> Gordon
>>
>>
>> On October 3, 2016 at 2:08:31 PM, Govindarajan Srinivasaraghavan (
>> govindraghvan@gmail.com) wrote:
>>
>> Hi,
>>
>>
>>
>> I have few questions on how I need to model my use case in flink. Please
>> advise. Thanks for the help.
>>
>>
>>
>> - I'm currently running my flink program on 1.2 SNAPSHOT with kafka
>> source and I have checkpoint enabled. When I look at the consumer offsets
>> in kafka it appears to be stagnant and there is a huge lag. But I can see
>> my flink program is in pace with kafka source in JMX metrics and outputs.
>> Is there a way to identify why the offsets are not committed to kafka?
>>
>>
>>
>> - In my current application we custom loggers for debugging purposes.
>> Let’s say we want to find what’s happening for a particular user, we fire
>> an api request to add the custom logger for that particular user and use it
>> for logging along the data path. Is there a way to achieve this in flink?
>> Are there any global mutable parameters that I can use to achieve this
>> functionality?
>>
>>
>>
>> - Can I pass on state between operators? If I need the state stored on
>> previous operators, how can I fetch it?
>>
>>
>>
>> Thanks
>>
>>
>

Re: Using Flink

Posted by Govindarajan Srinivasaraghavan <go...@gmail.com>.
Hi Gordon,

- I'm currently running my flink program on 1.2 SNAPSHOT with kafka source
and I have checkpoint enabled. When I look at the consumer offsets in kafka
it appears to be stagnant and there is a huge lag. But I can see my flink
program is in pace with kafka source in JMX metrics and outputs. Is there a
way to identify why the offsets are not committed to kafka?

On which commit was your Kafka connector built? There was a recent change
to the offset committing for Kafka 0.9 consumer, so identifying the exact
commit will help clarify whether the recent change introduced any new
problems. Also, what is your configured checkpoint interval? When
checkpointing is enabled, the Kafka consumer only commits to Kafka when
checkpoints are completed. So, offsets in Kafka are not updated until the
next checkpoint is triggered.

I am using 1.2-SNAPSHOT
'org.apache.flink', name: 'flink-connector-kafka-base_2.11', version:
'1.2-SNAPSHOT'
'org.apache.flink', name: 'flink-connector-kafka-0.9_2.11', version:
'1.2-SNAPSHOT'

- In my current application we custom loggers for debugging purposes. Let’s
say we want to find what’s happening for a particular user, we fire an api
request to add the custom logger for that particular user and use it for
logging along the data path. Is there a way to achieve this in flink? Are
there any global mutable parameters that I can use to achieve this
functionality?

I’m not sure if I understand the use case correctly, but it seems like you
will need to change configuration / behaviour of a specific Flink operator
at runtime. To my knowledge, the best way to do this in Flink right now is
to translate your original logging-trigger api requests to a stream of
events fed to Flink. This stream of events will then basically be changes
of your user logger behaviour, and your operators can change its logging
behaviour according to this stream.

I can send the changes as streams, but I need this change for all the
operators in my pipeline. Instead of using coflatmap at each operator to
combine the streams, is there a way to send a change to all the operators?

- Can I pass on state between operators? If I need the state stored on
previous operators, how can I fetch it?
I don’t think this is possible.
Fine, thanks.

Thanks.

On Mon, Oct 3, 2016 at 12:23 AM, Tzu-Li (Gordon) Tai <tz...@apache.org>
wrote:

> Hi!
>
> - I'm currently running my flink program on 1.2 SNAPSHOT with kafka source
> and I have checkpoint enabled. When I look at the consumer offsets in kafka
> it appears to be stagnant and there is a huge lag. But I can see my flink
> program is in pace with kafka source in JMX metrics and outputs. Is there a
> way to identify why the offsets are not committed to kafka?
>
> On which commit was your Kafka connector built? There was a recent change
> to the offset committing for Kafka 0.9 consumer, so identifying the exact
> commit will help clarify whether the recent change introduced any new
> problems. Also, what is your configured checkpoint interval? When
> checkpointing is enabled, the Kafka consumer only commits to Kafka when
> checkpoints are completed. So, offsets in Kafka are not updated until the
> next checkpoint is triggered.
>
> - In my current application we custom loggers for debugging purposes.
> Let’s say we want to find what’s happening for a particular user, we fire
> an api request to add the custom logger for that particular user and use it
> for logging along the data path. Is there a way to achieve this in flink?
> Are there any global mutable parameters that I can use to achieve this
> functionality?
>
> I’m not sure if I understand the use case correctly, but it seems like you
> will need to change configuration / behaviour of a specific Flink operator
> at runtime. To my knowledge, the best way to do this in Flink right now is
> to translate your original logging-trigger api requests to a stream of
> events fed to Flink. This stream of events will then basically be changes
> of your user logger behaviour, and your operators can change its logging
> behaviour according to this stream.
>
> - Can I pass on state between operators? If I need the state stored on
> previous operators, how can I fetch it?
>
> I don’t think this is possible.
>
>
> Best Regards,
> Gordon
>
>
> On October 3, 2016 at 2:08:31 PM, Govindarajan Srinivasaraghavan (
> govindraghvan@gmail.com) wrote:
>
> Hi,
>
>
>
> I have few questions on how I need to model my use case in flink. Please
> advise. Thanks for the help.
>
>
>
> - I'm currently running my flink program on 1.2 SNAPSHOT with kafka source
> and I have checkpoint enabled. When I look at the consumer offsets in kafka
> it appears to be stagnant and there is a huge lag. But I can see my flink
> program is in pace with kafka source in JMX metrics and outputs. Is there a
> way to identify why the offsets are not committed to kafka?
>
>
>
> - In my current application we custom loggers for debugging purposes.
> Let’s say we want to find what’s happening for a particular user, we fire
> an api request to add the custom logger for that particular user and use it
> for logging along the data path. Is there a way to achieve this in flink?
> Are there any global mutable parameters that I can use to achieve this
> functionality?
>
>
>
> - Can I pass on state between operators? If I need the state stored on
> previous operators, how can I fetch it?
>
>
>
> Thanks
>
>

Re: Using Flink

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi!

- I'm currently running my flink program on 1.2 SNAPSHOT with kafka source and I have checkpoint enabled. When I look at the consumer offsets in kafka it appears to be stagnant and there is a huge lag. But I can see my flink program is in pace with kafka source in JMX metrics and outputs. Is there a way to identify why the offsets are not committed to kafka?

On which commit was your Kafka connector built? There was a recent change to the offset committing for Kafka 0.9 consumer, so identifying the exact commit will help clarify whether the recent change introduced any new problems. Also, what is your configured checkpoint interval? When checkpointing is enabled, the Kafka consumer only commits to Kafka when checkpoints are completed. So, offsets in Kafka are not updated until the next checkpoint is triggered.

- In my current application we custom loggers for debugging purposes. Let’s say we want to find what’s happening for a particular user, we fire an api request to add the custom logger for that particular user and use it for logging along the data path. Is there a way to achieve this in flink? Are there any global mutable parameters that I can use to achieve this functionality?

I’m not sure if I understand the use case correctly, but it seems like you will need to change configuration / behaviour of a specific Flink operator at runtime. To my knowledge, the best way to do this in Flink right now is to translate your original logging-trigger api requests to a stream of events fed to Flink. This stream of events will then basically be changes of your user logger behaviour, and your operators can change its logging behaviour according to this stream.

- Can I pass on state between operators? If I need the state stored on previous operators, how can I fetch it?

I don’t think this is possible.


Best Regards,
Gordon


On October 3, 2016 at 2:08:31 PM, Govindarajan Srinivasaraghavan (govindraghvan@gmail.com) wrote:

Hi,

 

I have few questions on how I need to model my use case in flink. Please advise. Thanks for the help.

 

- I'm currently running my flink program on 1.2 SNAPSHOT with kafka source and I have checkpoint enabled. When I look at the consumer offsets in kafka it appears to be stagnant and there is a huge lag. But I can see my flink program is in pace with kafka source in JMX metrics and outputs. Is there a way to identify why the offsets are not committed to kafka?

 

- In my current application we custom loggers for debugging purposes. Let’s say we want to find what’s happening for a particular user, we fire an api request to add the custom logger for that particular user and use it for logging along the data path. Is there a way to achieve this in flink? Are there any global mutable parameters that I can use to achieve this functionality?

 

- Can I pass on state between operators? If I need the state stored on previous operators, how can I fetch it?

 

Thanks