You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Shqiprim Bunjaku <sh...@gmail.com> on 2022/03/04 12:25:46 UTC
Late events during job catch up & Unbalanced sources
Hi,
I have a pipeline that processes data from Kafka Sources, and when the job
lags behind because of an upgrade or simple stop/start, there are a lot of
late records being dropped on the Window Operator.
See the pipeline attached.
Source *connector-sensor-data *after enrichment using another Kafka Topic,
is joined with *point-sensor-data. *These two sources have different
message rates, i.e *connector-sensor-data *20 messages/s, and
*point-sensor-data
*has 2k-10k messages/s. These two streams are connected using the
EvenTimeSession Window. When the job is deployed and is running, no matter
the load there is no late data, but after I stop, and start again I start
to see late data in the beginning until the job catches up with buffered
data. The late data are dropped during the EvenTimeSession (from
picture *Window(EventTimeSessionWindows(20000),
PurgingTrigger, CoGroupWindowFunction) -> point-time-extract *).
Saw different articles of balanced reads per Kafka Partition, and
backpressure when two streams with different rates are connected, but I am
not sure if this is already handled by Flink or if I am doing any
misconfiguration.
Thank you!
[image: Screen Shot 2022-03-04 at 11.41.25 AM.png]
Re: Late events during job catch up & Unbalanced sources
Posted by Shqiprim Bunjaku <sh...@gmail.com>.
Hey Roman,
It works using old Kafka Source.
Thank you very much!
On Fri, Mar 4, 2022 at 5:25 PM Roman Khachatryan <ro...@apache.org> wrote:
> Thanks Shqiprim,
>
> You are probably facing a bug in the new Kafka source ( FLIP-27) [1].
> Could you please check if you have the same issue with the old Kafka
> source [2]?
>
> [1]
> https://issues.apache.org/jira/browse/FLINK-26018
> [2]
>
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#kafka-sourcefunction
>
> Regards,
> Roman
>
>
> On Fri, Mar 4, 2022 at 3:02 PM Shqiprim Bunjaku <sh...@gmail.com>
> wrote:
>
>> Just enabled Unaligned Checkpoints, but the same issue.
>>
>>
>>
>> [image: Screen Shot 2022-03-04 at 3.00.52 PM.png]
>>
>> On Fri, Mar 4, 2022 at 2:43 PM Shqiprim Bunjaku <
>> shqiprimbunjaku@gmail.com> wrote:
>>
>>>
>>>
>>> I used *WatermarkStrategy.forMonotonousTimestamps *and *WatermarkStrategy.forBoundedOutOfOrderness
>>> *but still I had the same issue*. *Just to be clear, events are in
>>> order per Kafka Partition and I don't expect any out of order events. Also
>>> I am not using Unaligned Checkpoints.
>>>
>>>
>>> KafkaSource<KeyedRecord<PointSensorData>> source = KafkaSource.<KeyedRecord<PointSensorData>>builder()
>>> .setProperties(kafkaProperties)
>>> .setTopics(Topics.POINT_SENSOR_DATA)
>>> .setGroupId(kafkaProperties.getProperty("group.id") + "-" + Topics.POINT_SENSOR_DATA)
>>> .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
>>> .setDeserializer(new KafkaRecordDeserializer<>(PointSensorData.class))
>>> .build();
>>>
>>> WatermarkStrategy<KeyedRecord<PointSensorData>> objectWatermarkStrategy = WatermarkStrategy.<KeyedRecord<PointSensorData>>
>>> forBoundedOutOfOrderness(Duration.ofMinutes(1))
>>> .withIdleness(Duration.ofMinutes(2));
>>>
>>> return environment.fromSource(source, objectWatermarkStrategy, Topics.POINT_SENSOR_DATA)
>>> .uid("point-sensor-data-source")
>>> .setMaxParallelism(3)
>>> .rescale();
>>>
>>>
>>> On Fri, Mar 4, 2022 at 2:26 PM Roman Khachatryan <ro...@apache.org>
>>> wrote:
>>>
>>>> Hi Shqiprim,
>>>>
>>>> Could you share how you assign timestamps and generate watermarks and
>>>> whether you are using Unaligned Checkpoints?
>>>>
>>>> Regards,
>>>> Roman
>>>>
>>>>
>>>> On Fri, Mar 4, 2022 at 1:26 PM Shqiprim Bunjaku <
>>>> shqiprimbunjaku@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I have a pipeline that processes data from Kafka Sources, and when the
>>>>> job lags behind because of an upgrade or simple stop/start, there are a lot
>>>>> of late records being dropped on the Window Operator.
>>>>>
>>>>> See the pipeline attached.
>>>>>
>>>>> Source *connector-sensor-data *after enrichment using another Kafka
>>>>> Topic, is joined with *point-sensor-data. *These two sources have
>>>>> different message rates, i.e *connector-sensor-data *20 messages/s,
>>>>> and *point-sensor-data *has 2k-10k messages/s. These two streams are
>>>>> connected using the EvenTimeSession Window. When the job is deployed and is
>>>>> running, no matter the load there is no late data, but after I stop, and
>>>>> start again I start to see late data in the beginning until the job catches
>>>>> up with buffered data. The late data are dropped during the EvenTimeSession
>>>>> (from picture *Window(EventTimeSessionWindows(20000), PurgingTrigger,
>>>>> CoGroupWindowFunction) -> point-time-extract *).
>>>>>
>>>>> Saw different articles of balanced reads per Kafka Partition, and
>>>>> backpressure when two streams with different rates are connected, but I am
>>>>> not sure if this is already handled by Flink or if I am doing any
>>>>> misconfiguration.
>>>>>
>>>>> Thank you!
>>>>> [image: Screen Shot 2022-03-04 at 11.41.25 AM.png]
>>>>>
>>>>
Re: Late events during job catch up & Unbalanced sources
Posted by Roman Khachatryan <ro...@apache.org>.
Thanks Shqiprim,
You are probably facing a bug in the new Kafka source ( FLIP-27) [1].
Could you please check if you have the same issue with the old Kafka source
[2]?
[1]
https://issues.apache.org/jira/browse/FLINK-26018
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#kafka-sourcefunction
Regards,
Roman
On Fri, Mar 4, 2022 at 3:02 PM Shqiprim Bunjaku <sh...@gmail.com>
wrote:
> Just enabled Unaligned Checkpoints, but the same issue.
>
>
>
> [image: Screen Shot 2022-03-04 at 3.00.52 PM.png]
>
> On Fri, Mar 4, 2022 at 2:43 PM Shqiprim Bunjaku <sh...@gmail.com>
> wrote:
>
>>
>>
>> I used *WatermarkStrategy.forMonotonousTimestamps *and *WatermarkStrategy.forBoundedOutOfOrderness
>> *but still I had the same issue*. *Just to be clear, events are in order
>> per Kafka Partition and I don't expect any out of order events. Also I am
>> not using Unaligned Checkpoints.
>>
>>
>> KafkaSource<KeyedRecord<PointSensorData>> source = KafkaSource.<KeyedRecord<PointSensorData>>builder()
>> .setProperties(kafkaProperties)
>> .setTopics(Topics.POINT_SENSOR_DATA)
>> .setGroupId(kafkaProperties.getProperty("group.id") + "-" + Topics.POINT_SENSOR_DATA)
>> .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
>> .setDeserializer(new KafkaRecordDeserializer<>(PointSensorData.class))
>> .build();
>>
>> WatermarkStrategy<KeyedRecord<PointSensorData>> objectWatermarkStrategy = WatermarkStrategy.<KeyedRecord<PointSensorData>>
>> forBoundedOutOfOrderness(Duration.ofMinutes(1))
>> .withIdleness(Duration.ofMinutes(2));
>>
>> return environment.fromSource(source, objectWatermarkStrategy, Topics.POINT_SENSOR_DATA)
>> .uid("point-sensor-data-source")
>> .setMaxParallelism(3)
>> .rescale();
>>
>>
>> On Fri, Mar 4, 2022 at 2:26 PM Roman Khachatryan <ro...@apache.org>
>> wrote:
>>
>>> Hi Shqiprim,
>>>
>>> Could you share how you assign timestamps and generate watermarks and
>>> whether you are using Unaligned Checkpoints?
>>>
>>> Regards,
>>> Roman
>>>
>>>
>>> On Fri, Mar 4, 2022 at 1:26 PM Shqiprim Bunjaku <
>>> shqiprimbunjaku@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have a pipeline that processes data from Kafka Sources, and when the
>>>> job lags behind because of an upgrade or simple stop/start, there are a lot
>>>> of late records being dropped on the Window Operator.
>>>>
>>>> See the pipeline attached.
>>>>
>>>> Source *connector-sensor-data *after enrichment using another Kafka
>>>> Topic, is joined with *point-sensor-data. *These two sources have
>>>> different message rates, i.e *connector-sensor-data *20 messages/s,
>>>> and *point-sensor-data *has 2k-10k messages/s. These two streams are
>>>> connected using the EvenTimeSession Window. When the job is deployed and is
>>>> running, no matter the load there is no late data, but after I stop, and
>>>> start again I start to see late data in the beginning until the job catches
>>>> up with buffered data. The late data are dropped during the EvenTimeSession
>>>> (from picture *Window(EventTimeSessionWindows(20000), PurgingTrigger,
>>>> CoGroupWindowFunction) -> point-time-extract *).
>>>>
>>>> Saw different articles of balanced reads per Kafka Partition, and
>>>> backpressure when two streams with different rates are connected, but I am
>>>> not sure if this is already handled by Flink or if I am doing any
>>>> misconfiguration.
>>>>
>>>> Thank you!
>>>> [image: Screen Shot 2022-03-04 at 11.41.25 AM.png]
>>>>
>>>
Re: Late events during job catch up & Unbalanced sources
Posted by Shqiprim Bunjaku <sh...@gmail.com>.
Just enabled Unaligned Checkpoints, but the same issue.
[image: Screen Shot 2022-03-04 at 3.00.52 PM.png]
On Fri, Mar 4, 2022 at 2:43 PM Shqiprim Bunjaku <sh...@gmail.com>
wrote:
>
>
> I used *WatermarkStrategy.forMonotonousTimestamps *and *WatermarkStrategy.forBoundedOutOfOrderness
> *but still I had the same issue*. *Just to be clear, events are in order
> per Kafka Partition and I don't expect any out of order events. Also I am
> not using Unaligned Checkpoints.
>
>
> KafkaSource<KeyedRecord<PointSensorData>> source = KafkaSource.<KeyedRecord<PointSensorData>>builder()
> .setProperties(kafkaProperties)
> .setTopics(Topics.POINT_SENSOR_DATA)
> .setGroupId(kafkaProperties.getProperty("group.id") + "-" + Topics.POINT_SENSOR_DATA)
> .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
> .setDeserializer(new KafkaRecordDeserializer<>(PointSensorData.class))
> .build();
>
> WatermarkStrategy<KeyedRecord<PointSensorData>> objectWatermarkStrategy = WatermarkStrategy.<KeyedRecord<PointSensorData>>
> forBoundedOutOfOrderness(Duration.ofMinutes(1))
> .withIdleness(Duration.ofMinutes(2));
>
> return environment.fromSource(source, objectWatermarkStrategy, Topics.POINT_SENSOR_DATA)
> .uid("point-sensor-data-source")
> .setMaxParallelism(3)
> .rescale();
>
>
> On Fri, Mar 4, 2022 at 2:26 PM Roman Khachatryan <ro...@apache.org> wrote:
>
>> Hi Shqiprim,
>>
>> Could you share how you assign timestamps and generate watermarks and
>> whether you are using Unaligned Checkpoints?
>>
>> Regards,
>> Roman
>>
>>
>> On Fri, Mar 4, 2022 at 1:26 PM Shqiprim Bunjaku <
>> shqiprimbunjaku@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I have a pipeline that processes data from Kafka Sources, and when the
>>> job lags behind because of an upgrade or simple stop/start, there are a lot
>>> of late records being dropped on the Window Operator.
>>>
>>> See the pipeline attached.
>>>
>>> Source *connector-sensor-data *after enrichment using another Kafka
>>> Topic, is joined with *point-sensor-data. *These two sources have
>>> different message rates, i.e *connector-sensor-data *20 messages/s, and *point-sensor-data
>>> *has 2k-10k messages/s. These two streams are connected using the
>>> EvenTimeSession Window. When the job is deployed and is running, no matter
>>> the load there is no late data, but after I stop, and start again I start
>>> to see late data in the beginning until the job catches up with buffered
>>> data. The late data are dropped during the EvenTimeSession (from picture *Window(EventTimeSessionWindows(20000),
>>> PurgingTrigger, CoGroupWindowFunction) -> point-time-extract *).
>>>
>>> Saw different articles of balanced reads per Kafka Partition, and
>>> backpressure when two streams with different rates are connected, but I am
>>> not sure if this is already handled by Flink or if I am doing any
>>> misconfiguration.
>>>
>>> Thank you!
>>> [image: Screen Shot 2022-03-04 at 11.41.25 AM.png]
>>>
>>
Re: Late events during job catch up & Unbalanced sources
Posted by Shqiprim Bunjaku <sh...@gmail.com>.
I used *WatermarkStrategy.forMonotonousTimestamps *and
*WatermarkStrategy.forBoundedOutOfOrderness
*but still I had the same issue*. *Just to be clear, events are in order
per Kafka Partition and I don't expect any out of order events. Also I am
not using Unaligned Checkpoints.
KafkaSource<KeyedRecord<PointSensorData>> source =
KafkaSource.<KeyedRecord<PointSensorData>>builder()
.setProperties(kafkaProperties)
.setTopics(Topics.POINT_SENSOR_DATA)
.setGroupId(kafkaProperties.getProperty("group.id") + "-" +
Topics.POINT_SENSOR_DATA)
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.setDeserializer(new KafkaRecordDeserializer<>(PointSensorData.class))
.build();
WatermarkStrategy<KeyedRecord<PointSensorData>>
objectWatermarkStrategy =
WatermarkStrategy.<KeyedRecord<PointSensorData>>
forBoundedOutOfOrderness(Duration.ofMinutes(1))
.withIdleness(Duration.ofMinutes(2));
return environment.fromSource(source, objectWatermarkStrategy,
Topics.POINT_SENSOR_DATA)
.uid("point-sensor-data-source")
.setMaxParallelism(3)
.rescale();
On Fri, Mar 4, 2022 at 2:26 PM Roman Khachatryan <ro...@apache.org> wrote:
> Hi Shqiprim,
>
> Could you share how you assign timestamps and generate watermarks and
> whether you are using Unaligned Checkpoints?
>
> Regards,
> Roman
>
>
> On Fri, Mar 4, 2022 at 1:26 PM Shqiprim Bunjaku <sh...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I have a pipeline that processes data from Kafka Sources, and when the
>> job lags behind because of an upgrade or simple stop/start, there are a lot
>> of late records being dropped on the Window Operator.
>>
>> See the pipeline attached.
>>
>> Source *connector-sensor-data *after enrichment using another Kafka
>> Topic, is joined with *point-sensor-data. *These two sources have
>> different message rates, i.e *connector-sensor-data *20 messages/s, and *point-sensor-data
>> *has 2k-10k messages/s. These two streams are connected using the
>> EvenTimeSession Window. When the job is deployed and is running, no matter
>> the load there is no late data, but after I stop, and start again I start
>> to see late data in the beginning until the job catches up with buffered
>> data. The late data are dropped during the EvenTimeSession (from picture *Window(EventTimeSessionWindows(20000),
>> PurgingTrigger, CoGroupWindowFunction) -> point-time-extract *).
>>
>> Saw different articles of balanced reads per Kafka Partition, and
>> backpressure when two streams with different rates are connected, but I am
>> not sure if this is already handled by Flink or if I am doing any
>> misconfiguration.
>>
>> Thank you!
>> [image: Screen Shot 2022-03-04 at 11.41.25 AM.png]
>>
>
Re: Late events during job catch up & Unbalanced sources
Posted by Roman Khachatryan <ro...@apache.org>.
Hi Shqiprim,
Could you share how you assign timestamps and generate watermarks and
whether you are using Unaligned Checkpoints?
Regards,
Roman
On Fri, Mar 4, 2022 at 1:26 PM Shqiprim Bunjaku <sh...@gmail.com>
wrote:
> Hi,
>
> I have a pipeline that processes data from Kafka Sources, and when the job
> lags behind because of an upgrade or simple stop/start, there are a lot of
> late records being dropped on the Window Operator.
>
> See the pipeline attached.
>
> Source *connector-sensor-data *after enrichment using another Kafka
> Topic, is joined with *point-sensor-data. *These two sources have
> different message rates, i.e *connector-sensor-data *20 messages/s, and *point-sensor-data
> *has 2k-10k messages/s. These two streams are connected using the
> EvenTimeSession Window. When the job is deployed and is running, no matter
> the load there is no late data, but after I stop, and start again I start
> to see late data in the beginning until the job catches up with buffered
> data. The late data are dropped during the EvenTimeSession (from picture *Window(EventTimeSessionWindows(20000),
> PurgingTrigger, CoGroupWindowFunction) -> point-time-extract *).
>
> Saw different articles of balanced reads per Kafka Partition, and
> backpressure when two streams with different rates are connected, but I am
> not sure if this is already handled by Flink or if I am doing any
> misconfiguration.
>
> Thank you!
> [image: Screen Shot 2022-03-04 at 11.41.25 AM.png]
>