You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by bat man <ti...@gmail.com> on 2020/07/21 06:00:13 UTC

Handle idle kafka source in Flink 1.9

Hello,

I have a pipeline which consumes data from a Kafka source. Since, the
partitions are partitioned by device_id in case a group of devices is down
some partitions will not get normal flow of data.
I understand from documentation here[1] in flink 1.11 one can declare the
source idle -
WatermarkStrategy.<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.
ofSeconds(20)).withIdleness(Duration.ofMinutes(1));

How can I handle this in 1.9, since I am using aws emr and emr doesn't have
any release with the latest flink version.

One way I could think of is to trigger watermark generation every 10
minutes or so using Periodic watermarks. However, this will not be full
proof, are there any better way to handle this more dynamically.

[1] -
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#watermark-strategies-and-the-kafka-connector

Thanks,
Hemant

Re: Handle idle kafka source in Flink 1.9

Posted by bat man <ti...@gmail.com>.
Hello Arvid,

Thanks for the suggestion/reference and my apologies for the late reply.

With this I am able to process the data with some topics not having regular
data. Obviously, late data is being handheld as in side-output and has a
process for it.
One challenge is to handle the back-fill as when I run the job with old
data because of watermark(taking into account maxOutOfOrderness is set to
10 minutes) the older data gets filtered as late data. For handling this I
am thinking of running the side-input with maxOutOfOrderness to the oldest
data, regular job to be ok with normal setting.

Thanks,
Hemant

On Thu, Jul 30, 2020 at 2:41 PM Arvid Heise <ar...@ververica.com> wrote:

> Hi Hemant,
>
> sorry for the late reply.
>
> You can just create your own watermark assigner and either copy the
> assigner from Flink 1.11 or take the one that we use in our trainings [1].
>
> [1]
> https://github.com/ververica/flink-training-troubleshooting/blob/master/src/main/java/com/ververica/flinktraining/solutions/troubleshoot/TroubledStreamingJobSolution2.java#L129-L187
>
> On Thu, Jul 23, 2020 at 8:48 PM bat man <ti...@gmail.com> wrote:
>
>> Thanks Niels for a great talk. You have covered two of my pain areas -
>> slim and broken streams. Since I am dealing with device data from on-prem
>> data centers. The first option of generating fabricated watermark events is
>> fine, however as mentioned in your talk how are you handling forwarding it
>> to the next stream(next kafka topic) after enrichment. Have you got any
>> solution for this?
>>
>> -Hemant
>>
>> On Thu, Jul 23, 2020 at 12:05 PM Niels Basjes <Ni...@basjes.nl> wrote:
>>
>>> Have a look at this presentation I gave a few weeks ago.
>>> https://youtu.be/bQmz7JOmE_4
>>>
>>> Niels Basjes
>>>
>>> On Wed, 22 Jul 2020, 08:51 bat man, <ti...@gmail.com> wrote:
>>>
>>>> Hi Team,
>>>>
>>>> Can someone share their experiences handling this.
>>>>
>>>> Thanks.
>>>>
>>>> On Tue, Jul 21, 2020 at 11:30 AM bat man <ti...@gmail.com> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> I have a pipeline which consumes data from a Kafka source. Since, the
>>>>> partitions are partitioned by device_id in case a group of devices is down
>>>>> some partitions will not get normal flow of data.
>>>>> I understand from documentation here[1] in flink 1.11 one can declare
>>>>> the source idle -
>>>>> WatermarkStrategy.<Tuple2<Long, String>>forBoundedOutOfOrderness(
>>>>> Duration.ofSeconds(20)).withIdleness(Duration.ofMinutes(1));
>>>>>
>>>>> How can I handle this in 1.9, since I am using aws emr and emr doesn't
>>>>> have any release with the latest flink version.
>>>>>
>>>>> One way I could think of is to trigger watermark generation every 10
>>>>> minutes or so using Periodic watermarks. However, this will not be full
>>>>> proof, are there any better way to handle this more dynamically.
>>>>>
>>>>> [1] -
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#watermark-strategies-and-the-kafka-connector
>>>>>
>>>>> Thanks,
>>>>> Hemant
>>>>>
>>>>>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>

Re: Handle idle kafka source in Flink 1.9

Posted by Arvid Heise <ar...@ververica.com>.
Hi Hemant,

sorry for the late reply.

You can just create your own watermark assigner and either copy the
assigner from Flink 1.11 or take the one that we use in our trainings [1].

[1]
https://github.com/ververica/flink-training-troubleshooting/blob/master/src/main/java/com/ververica/flinktraining/solutions/troubleshoot/TroubledStreamingJobSolution2.java#L129-L187

On Thu, Jul 23, 2020 at 8:48 PM bat man <ti...@gmail.com> wrote:

> Thanks Niels for a great talk. You have covered two of my pain areas -
> slim and broken streams. Since I am dealing with device data from on-prem
> data centers. The first option of generating fabricated watermark events is
> fine, however as mentioned in your talk how are you handling forwarding it
> to the next stream(next kafka topic) after enrichment. Have you got any
> solution for this?
>
> -Hemant
>
> On Thu, Jul 23, 2020 at 12:05 PM Niels Basjes <Ni...@basjes.nl> wrote:
>
>> Have a look at this presentation I gave a few weeks ago.
>> https://youtu.be/bQmz7JOmE_4
>>
>> Niels Basjes
>>
>> On Wed, 22 Jul 2020, 08:51 bat man, <ti...@gmail.com> wrote:
>>
>>> Hi Team,
>>>
>>> Can someone share their experiences handling this.
>>>
>>> Thanks.
>>>
>>> On Tue, Jul 21, 2020 at 11:30 AM bat man <ti...@gmail.com> wrote:
>>>
>>>> Hello,
>>>>
>>>> I have a pipeline which consumes data from a Kafka source. Since, the
>>>> partitions are partitioned by device_id in case a group of devices is down
>>>> some partitions will not get normal flow of data.
>>>> I understand from documentation here[1] in flink 1.11 one can declare
>>>> the source idle -
>>>> WatermarkStrategy.<Tuple2<Long, String>>forBoundedOutOfOrderness(
>>>> Duration.ofSeconds(20)).withIdleness(Duration.ofMinutes(1));
>>>>
>>>> How can I handle this in 1.9, since I am using aws emr and emr doesn't
>>>> have any release with the latest flink version.
>>>>
>>>> One way I could think of is to trigger watermark generation every 10
>>>> minutes or so using Periodic watermarks. However, this will not be full
>>>> proof, are there any better way to handle this more dynamically.
>>>>
>>>> [1] -
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#watermark-strategies-and-the-kafka-connector
>>>>
>>>> Thanks,
>>>> Hemant
>>>>
>>>>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Re: Handle idle kafka source in Flink 1.9

Posted by bat man <ti...@gmail.com>.
Thanks Niels for a great talk. You have covered two of my pain areas - slim
and broken streams. Since I am dealing with device data from on-prem data
centers. The first option of generating fabricated watermark events is
fine, however as mentioned in your talk how are you handling forwarding it
to the next stream(next kafka topic) after enrichment. Have you got any
solution for this?

-Hemant

On Thu, Jul 23, 2020 at 12:05 PM Niels Basjes <Ni...@basjes.nl> wrote:

> Have a look at this presentation I gave a few weeks ago.
> https://youtu.be/bQmz7JOmE_4
>
> Niels Basjes
>
> On Wed, 22 Jul 2020, 08:51 bat man, <ti...@gmail.com> wrote:
>
>> Hi Team,
>>
>> Can someone share their experiences handling this.
>>
>> Thanks.
>>
>> On Tue, Jul 21, 2020 at 11:30 AM bat man <ti...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> I have a pipeline which consumes data from a Kafka source. Since, the
>>> partitions are partitioned by device_id in case a group of devices is down
>>> some partitions will not get normal flow of data.
>>> I understand from documentation here[1] in flink 1.11 one can declare
>>> the source idle -
>>> WatermarkStrategy.<Tuple2<Long, String>>forBoundedOutOfOrderness(
>>> Duration.ofSeconds(20)).withIdleness(Duration.ofMinutes(1));
>>>
>>> How can I handle this in 1.9, since I am using aws emr and emr doesn't
>>> have any release with the latest flink version.
>>>
>>> One way I could think of is to trigger watermark generation every 10
>>> minutes or so using Periodic watermarks. However, this will not be full
>>> proof, are there any better way to handle this more dynamically.
>>>
>>> [1] -
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#watermark-strategies-and-the-kafka-connector
>>>
>>> Thanks,
>>> Hemant
>>>
>>>

Re: Handle idle kafka source in Flink 1.9

Posted by Niels Basjes <Ni...@basjes.nl>.
Have a look at this presentation I gave a few weeks ago.
https://youtu.be/bQmz7JOmE_4

Niels Basjes

On Wed, 22 Jul 2020, 08:51 bat man, <ti...@gmail.com> wrote:

> Hi Team,
>
> Can someone share their experiences handling this.
>
> Thanks.
>
> On Tue, Jul 21, 2020 at 11:30 AM bat man <ti...@gmail.com> wrote:
>
>> Hello,
>>
>> I have a pipeline which consumes data from a Kafka source. Since, the
>> partitions are partitioned by device_id in case a group of devices is down
>> some partitions will not get normal flow of data.
>> I understand from documentation here[1] in flink 1.11 one can declare the
>> source idle -
>> WatermarkStrategy.<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration
>> .ofSeconds(20)).withIdleness(Duration.ofMinutes(1));
>>
>> How can I handle this in 1.9, since I am using aws emr and emr doesn't
>> have any release with the latest flink version.
>>
>> One way I could think of is to trigger watermark generation every 10
>> minutes or so using Periodic watermarks. However, this will not be full
>> proof, are there any better way to handle this more dynamically.
>>
>> [1] -
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#watermark-strategies-and-the-kafka-connector
>>
>> Thanks,
>> Hemant
>>
>>

Re: Handle idle kafka source in Flink 1.9

Posted by bat man <ti...@gmail.com>.
Hi Team,

Can someone share their experiences handling this.

Thanks.

On Tue, Jul 21, 2020 at 11:30 AM bat man <ti...@gmail.com> wrote:

> Hello,
>
> I have a pipeline which consumes data from a Kafka source. Since, the
> partitions are partitioned by device_id in case a group of devices is down
> some partitions will not get normal flow of data.
> I understand from documentation here[1] in flink 1.11 one can declare the
> source idle -
> WatermarkStrategy.<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.
> ofSeconds(20)).withIdleness(Duration.ofMinutes(1));
>
> How can I handle this in 1.9, since I am using aws emr and emr doesn't
> have any release with the latest flink version.
>
> One way I could think of is to trigger watermark generation every 10
> minutes or so using Periodic watermarks. However, this will not be full
> proof, are there any better way to handle this more dynamically.
>
> [1] -
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#watermark-strategies-and-the-kafka-connector
>
> Thanks,
> Hemant
>
>