You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Shqiprim Bunjaku <sh...@gmail.com> on 2022/03/04 12:25:46 UTC

Late events during job catch up & Unbalanced sources

Hi,

I have a pipeline that processes data from Kafka Sources, and when the job
lags behind because of an upgrade or simple stop/start, there are a lot of
late records being dropped on the Window Operator.

See the pipeline attached.

Source *connector-sensor-data *after enrichment using another Kafka Topic,
is joined with *point-sensor-data. *These two sources have different
message rates, i.e *connector-sensor-data *20 messages/s, and
*point-sensor-data
*has 2k-10k messages/s. These two streams are connected using the
EvenTimeSession Window. When the job is deployed and is running, no matter
the load there is no late data, but after I stop, and start again I start
to see late data in the beginning until the job catches up with buffered
data. The late data are dropped during the EvenTimeSession (from
picture *Window(EventTimeSessionWindows(20000),
PurgingTrigger, CoGroupWindowFunction) -> point-time-extract *).

Saw different articles of balanced reads per Kafka Partition, and
backpressure when two streams with different rates are connected, but I am
not sure if this is already handled by Flink or if I am doing any
misconfiguration.

Thank you!
[image: Screen Shot 2022-03-04 at 11.41.25 AM.png]

Re: Late events during job catch up & Unbalanced sources

Posted by Shqiprim Bunjaku <sh...@gmail.com>.
Hey Roman,

It works using old Kafka Source.

Thank you very much!

On Fri, Mar 4, 2022 at 5:25 PM Roman Khachatryan <ro...@apache.org> wrote:

> Thanks Shqiprim,
>
> You are probably facing a bug in the new Kafka source ( FLIP-27) [1].
> Could you please check if you have the same issue with the old Kafka
> source [2]?
>
> [1]
> https://issues.apache.org/jira/browse/FLINK-26018
> [2]
>
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#kafka-sourcefunction
>
> Regards,
> Roman
>
>
> On Fri, Mar 4, 2022 at 3:02 PM Shqiprim Bunjaku <sh...@gmail.com>
> wrote:
>
>> Just enabled Unaligned Checkpoints, but the same issue.
>>
>>
>>
>> [image: Screen Shot 2022-03-04 at 3.00.52 PM.png]
>>
>> On Fri, Mar 4, 2022 at 2:43 PM Shqiprim Bunjaku <
>> shqiprimbunjaku@gmail.com> wrote:
>>
>>>
>>>
>>> I used *WatermarkStrategy.forMonotonousTimestamps *and *WatermarkStrategy.forBoundedOutOfOrderness
>>> *but still I had the same issue*. *Just to be clear, events are in
>>> order per Kafka Partition and I don't expect any out of order events. Also
>>> I am not using Unaligned Checkpoints.
>>>
>>>
>>> KafkaSource<KeyedRecord<PointSensorData>> source = KafkaSource.<KeyedRecord<PointSensorData>>builder()
>>>         .setProperties(kafkaProperties)
>>>         .setTopics(Topics.POINT_SENSOR_DATA)
>>>         .setGroupId(kafkaProperties.getProperty("group.id") + "-" + Topics.POINT_SENSOR_DATA)
>>>         .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
>>>         .setDeserializer(new KafkaRecordDeserializer<>(PointSensorData.class))
>>>         .build();
>>>
>>> WatermarkStrategy<KeyedRecord<PointSensorData>> objectWatermarkStrategy = WatermarkStrategy.<KeyedRecord<PointSensorData>>
>>>                 forBoundedOutOfOrderness(Duration.ofMinutes(1))
>>>         .withIdleness(Duration.ofMinutes(2));
>>>
>>> return environment.fromSource(source, objectWatermarkStrategy, Topics.POINT_SENSOR_DATA)
>>>         .uid("point-sensor-data-source")
>>>         .setMaxParallelism(3)
>>>         .rescale();
>>>
>>>
>>> On Fri, Mar 4, 2022 at 2:26 PM Roman Khachatryan <ro...@apache.org>
>>> wrote:
>>>
>>>> Hi Shqiprim,
>>>>
>>>> Could you share how you assign timestamps and generate watermarks and
>>>> whether you are using Unaligned Checkpoints?
>>>>
>>>> Regards,
>>>> Roman
>>>>
>>>>
>>>> On Fri, Mar 4, 2022 at 1:26 PM Shqiprim Bunjaku <
>>>> shqiprimbunjaku@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I have a pipeline that processes data from Kafka Sources, and when the
>>>>> job lags behind because of an upgrade or simple stop/start, there are a lot
>>>>> of late records being dropped on the Window Operator.
>>>>>
>>>>> See the pipeline attached.
>>>>>
>>>>> Source *connector-sensor-data *after enrichment using another Kafka
>>>>> Topic, is joined with *point-sensor-data. *These two sources have
>>>>> different message rates, i.e *connector-sensor-data *20 messages/s,
>>>>> and *point-sensor-data *has 2k-10k messages/s. These two streams are
>>>>> connected using the EvenTimeSession Window. When the job is deployed and is
>>>>> running, no matter the load there is no late data, but after I stop, and
>>>>> start again I start to see late data in the beginning until the job catches
>>>>> up with buffered data. The late data are dropped during the EvenTimeSession
>>>>> (from picture *Window(EventTimeSessionWindows(20000), PurgingTrigger,
>>>>> CoGroupWindowFunction) -> point-time-extract *).
>>>>>
>>>>> Saw different articles of balanced reads per Kafka Partition, and
>>>>> backpressure when two streams with different rates are connected, but I am
>>>>> not sure if this is already handled by Flink or if I am doing any
>>>>> misconfiguration.
>>>>>
>>>>> Thank you!
>>>>> [image: Screen Shot 2022-03-04 at 11.41.25 AM.png]
>>>>>
>>>>

Re: Late events during job catch up & Unbalanced sources

Posted by Roman Khachatryan <ro...@apache.org>.
Thanks Shqiprim,

You are probably facing a bug in the new Kafka source ( FLIP-27) [1].
Could you please check if you have the same issue with the old Kafka source
[2]?

[1]
https://issues.apache.org/jira/browse/FLINK-26018
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#kafka-sourcefunction

Regards,
Roman


On Fri, Mar 4, 2022 at 3:02 PM Shqiprim Bunjaku <sh...@gmail.com>
wrote:

> Just enabled Unaligned Checkpoints, but the same issue.
>
>
>
> [image: Screen Shot 2022-03-04 at 3.00.52 PM.png]
>
> On Fri, Mar 4, 2022 at 2:43 PM Shqiprim Bunjaku <sh...@gmail.com>
> wrote:
>
>>
>>
>> I used *WatermarkStrategy.forMonotonousTimestamps *and *WatermarkStrategy.forBoundedOutOfOrderness
>> *but still I had the same issue*. *Just to be clear, events are in order
>> per Kafka Partition and I don't expect any out of order events. Also I am
>> not using Unaligned Checkpoints.
>>
>>
>> KafkaSource<KeyedRecord<PointSensorData>> source = KafkaSource.<KeyedRecord<PointSensorData>>builder()
>>         .setProperties(kafkaProperties)
>>         .setTopics(Topics.POINT_SENSOR_DATA)
>>         .setGroupId(kafkaProperties.getProperty("group.id") + "-" + Topics.POINT_SENSOR_DATA)
>>         .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
>>         .setDeserializer(new KafkaRecordDeserializer<>(PointSensorData.class))
>>         .build();
>>
>> WatermarkStrategy<KeyedRecord<PointSensorData>> objectWatermarkStrategy = WatermarkStrategy.<KeyedRecord<PointSensorData>>
>>                 forBoundedOutOfOrderness(Duration.ofMinutes(1))
>>         .withIdleness(Duration.ofMinutes(2));
>>
>> return environment.fromSource(source, objectWatermarkStrategy, Topics.POINT_SENSOR_DATA)
>>         .uid("point-sensor-data-source")
>>         .setMaxParallelism(3)
>>         .rescale();
>>
>>
>> On Fri, Mar 4, 2022 at 2:26 PM Roman Khachatryan <ro...@apache.org>
>> wrote:
>>
>>> Hi Shqiprim,
>>>
>>> Could you share how you assign timestamps and generate watermarks and
>>> whether you are using Unaligned Checkpoints?
>>>
>>> Regards,
>>> Roman
>>>
>>>
>>> On Fri, Mar 4, 2022 at 1:26 PM Shqiprim Bunjaku <
>>> shqiprimbunjaku@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have a pipeline that processes data from Kafka Sources, and when the
>>>> job lags behind because of an upgrade or simple stop/start, there are a lot
>>>> of late records being dropped on the Window Operator.
>>>>
>>>> See the pipeline attached.
>>>>
>>>> Source *connector-sensor-data *after enrichment using another Kafka
>>>> Topic, is joined with *point-sensor-data. *These two sources have
>>>> different message rates, i.e *connector-sensor-data *20 messages/s,
>>>> and *point-sensor-data *has 2k-10k messages/s. These two streams are
>>>> connected using the EvenTimeSession Window. When the job is deployed and is
>>>> running, no matter the load there is no late data, but after I stop, and
>>>> start again I start to see late data in the beginning until the job catches
>>>> up with buffered data. The late data are dropped during the EvenTimeSession
>>>> (from picture *Window(EventTimeSessionWindows(20000), PurgingTrigger,
>>>> CoGroupWindowFunction) -> point-time-extract *).
>>>>
>>>> Saw different articles of balanced reads per Kafka Partition, and
>>>> backpressure when two streams with different rates are connected, but I am
>>>> not sure if this is already handled by Flink or if I am doing any
>>>> misconfiguration.
>>>>
>>>> Thank you!
>>>> [image: Screen Shot 2022-03-04 at 11.41.25 AM.png]
>>>>
>>>

Re: Late events during job catch up & Unbalanced sources

Posted by Shqiprim Bunjaku <sh...@gmail.com>.
Just enabled Unaligned Checkpoints, but the same issue.



[image: Screen Shot 2022-03-04 at 3.00.52 PM.png]

On Fri, Mar 4, 2022 at 2:43 PM Shqiprim Bunjaku <sh...@gmail.com>
wrote:

>
>
> I used *WatermarkStrategy.forMonotonousTimestamps *and *WatermarkStrategy.forBoundedOutOfOrderness
> *but still I had the same issue*. *Just to be clear, events are in order
> per Kafka Partition and I don't expect any out of order events. Also I am
> not using Unaligned Checkpoints.
>
>
> KafkaSource<KeyedRecord<PointSensorData>> source = KafkaSource.<KeyedRecord<PointSensorData>>builder()
>         .setProperties(kafkaProperties)
>         .setTopics(Topics.POINT_SENSOR_DATA)
>         .setGroupId(kafkaProperties.getProperty("group.id") + "-" + Topics.POINT_SENSOR_DATA)
>         .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
>         .setDeserializer(new KafkaRecordDeserializer<>(PointSensorData.class))
>         .build();
>
> WatermarkStrategy<KeyedRecord<PointSensorData>> objectWatermarkStrategy = WatermarkStrategy.<KeyedRecord<PointSensorData>>
>                 forBoundedOutOfOrderness(Duration.ofMinutes(1))
>         .withIdleness(Duration.ofMinutes(2));
>
> return environment.fromSource(source, objectWatermarkStrategy, Topics.POINT_SENSOR_DATA)
>         .uid("point-sensor-data-source")
>         .setMaxParallelism(3)
>         .rescale();
>
>
> On Fri, Mar 4, 2022 at 2:26 PM Roman Khachatryan <ro...@apache.org> wrote:
>
>> Hi Shqiprim,
>>
>> Could you share how you assign timestamps and generate watermarks and
>> whether you are using Unaligned Checkpoints?
>>
>> Regards,
>> Roman
>>
>>
>> On Fri, Mar 4, 2022 at 1:26 PM Shqiprim Bunjaku <
>> shqiprimbunjaku@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I have a pipeline that processes data from Kafka Sources, and when the
>>> job lags behind because of an upgrade or simple stop/start, there are a lot
>>> of late records being dropped on the Window Operator.
>>>
>>> See the pipeline attached.
>>>
>>> Source *connector-sensor-data *after enrichment using another Kafka
>>> Topic, is joined with *point-sensor-data. *These two sources have
>>> different message rates, i.e *connector-sensor-data *20 messages/s, and *point-sensor-data
>>> *has 2k-10k messages/s. These two streams are connected using the
>>> EvenTimeSession Window. When the job is deployed and is running, no matter
>>> the load there is no late data, but after I stop, and start again I start
>>> to see late data in the beginning until the job catches up with buffered
>>> data. The late data are dropped during the EvenTimeSession (from picture *Window(EventTimeSessionWindows(20000),
>>> PurgingTrigger, CoGroupWindowFunction) -> point-time-extract *).
>>>
>>> Saw different articles of balanced reads per Kafka Partition, and
>>> backpressure when two streams with different rates are connected, but I am
>>> not sure if this is already handled by Flink or if I am doing any
>>> misconfiguration.
>>>
>>> Thank you!
>>> [image: Screen Shot 2022-03-04 at 11.41.25 AM.png]
>>>
>>

Re: Late events during job catch up & Unbalanced sources

Posted by Shqiprim Bunjaku <sh...@gmail.com>.
I used *WatermarkStrategy.forMonotonousTimestamps *and
*WatermarkStrategy.forBoundedOutOfOrderness
*but still I had the same issue*. *Just to be clear, events are in order
per Kafka Partition and I don't expect any out of order events. Also I am
not using Unaligned Checkpoints.


KafkaSource<KeyedRecord<PointSensorData>> source =
KafkaSource.<KeyedRecord<PointSensorData>>builder()
        .setProperties(kafkaProperties)
        .setTopics(Topics.POINT_SENSOR_DATA)
        .setGroupId(kafkaProperties.getProperty("group.id") + "-" +
Topics.POINT_SENSOR_DATA)
        .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
        .setDeserializer(new KafkaRecordDeserializer<>(PointSensorData.class))
        .build();

WatermarkStrategy<KeyedRecord<PointSensorData>>
objectWatermarkStrategy =
WatermarkStrategy.<KeyedRecord<PointSensorData>>
                forBoundedOutOfOrderness(Duration.ofMinutes(1))
        .withIdleness(Duration.ofMinutes(2));

return environment.fromSource(source, objectWatermarkStrategy,
Topics.POINT_SENSOR_DATA)
        .uid("point-sensor-data-source")
        .setMaxParallelism(3)
        .rescale();


On Fri, Mar 4, 2022 at 2:26 PM Roman Khachatryan <ro...@apache.org> wrote:

> Hi Shqiprim,
>
> Could you share how you assign timestamps and generate watermarks and
> whether you are using Unaligned Checkpoints?
>
> Regards,
> Roman
>
>
> On Fri, Mar 4, 2022 at 1:26 PM Shqiprim Bunjaku <sh...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I have a pipeline that processes data from Kafka Sources, and when the
>> job lags behind because of an upgrade or simple stop/start, there are a lot
>> of late records being dropped on the Window Operator.
>>
>> See the pipeline attached.
>>
>> Source *connector-sensor-data *after enrichment using another Kafka
>> Topic, is joined with *point-sensor-data. *These two sources have
>> different message rates, i.e *connector-sensor-data *20 messages/s, and *point-sensor-data
>> *has 2k-10k messages/s. These two streams are connected using the
>> EvenTimeSession Window. When the job is deployed and is running, no matter
>> the load there is no late data, but after I stop, and start again I start
>> to see late data in the beginning until the job catches up with buffered
>> data. The late data are dropped during the EvenTimeSession (from picture *Window(EventTimeSessionWindows(20000),
>> PurgingTrigger, CoGroupWindowFunction) -> point-time-extract *).
>>
>> Saw different articles of balanced reads per Kafka Partition, and
>> backpressure when two streams with different rates are connected, but I am
>> not sure if this is already handled by Flink or if I am doing any
>> misconfiguration.
>>
>> Thank you!
>> [image: Screen Shot 2022-03-04 at 11.41.25 AM.png]
>>
>

Re: Late events during job catch up & Unbalanced sources

Posted by Roman Khachatryan <ro...@apache.org>.
Hi Shqiprim,

Could you share how you assign timestamps and generate watermarks and
whether you are using Unaligned Checkpoints?

Regards,
Roman


On Fri, Mar 4, 2022 at 1:26 PM Shqiprim Bunjaku <sh...@gmail.com>
wrote:

> Hi,
>
> I have a pipeline that processes data from Kafka Sources, and when the job
> lags behind because of an upgrade or simple stop/start, there are a lot of
> late records being dropped on the Window Operator.
>
> See the pipeline attached.
>
> Source *connector-sensor-data *after enrichment using another Kafka
> Topic, is joined with *point-sensor-data. *These two sources have
> different message rates, i.e *connector-sensor-data *20 messages/s, and *point-sensor-data
> *has 2k-10k messages/s. These two streams are connected using the
> EvenTimeSession Window. When the job is deployed and is running, no matter
> the load there is no late data, but after I stop, and start again I start
> to see late data in the beginning until the job catches up with buffered
> data. The late data are dropped during the EvenTimeSession (from picture *Window(EventTimeSessionWindows(20000),
> PurgingTrigger, CoGroupWindowFunction) -> point-time-extract *).
>
> Saw different articles of balanced reads per Kafka Partition, and
> backpressure when two streams with different rates are connected, but I am
> not sure if this is already handled by Flink or if I am doing any
> misconfiguration.
>
> Thank you!
> [image: Screen Shot 2022-03-04 at 11.41.25 AM.png]
>