You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Robert Cullen <ci...@gmail.com> on 2021/09/22 15:37:16 UTC

Unbounded Kafka Source

I have an unbounded kafka source that has records written to it every
second.  Instead of the job waiting to process the new messages it closes.
How do I keep the stream open?

KafkaSource<FluentdMessage> dataSource = KafkaSource
        .<FluentdMessage>builder()
        .setBootstrapServers(kafkaServer)
        .setTopics(Arrays.asList("fluentd"))
        .setGroupId("")
        .setDeserializer(new FluentdRecordDeserializer())
        //.setStartingOffsets(OffsetsInitializer.earliest())
        //.setBounded(OffsetsInitializer.latest())
        .setUnbounded(OffsetsInitializer.latest())
        .build();




-- 
Robert Cullen
240-475-4490

Re: Unbounded Kafka Source

Posted by Robert Cullen <ci...@gmail.com>.
Robert,

So removing the setUnbounded(OffsetInitializer.latest) fixed the issue.
Thanks!

On Wed, Sep 22, 2021 at 11:51 AM Robert Metzger <rm...@apache.org> wrote:

> Hi,
>
> What happens if you do not set any boundedness on the KafkaSource?
> For a DataStream job in streaming mode, the Kafka source should be
> unbounded.
>
> From reading the code, it seems that setting unbounded(latest) should not
> trigger the behavior you mention ... but the Flink docs are not clearly
> written [1], as it says that you can make a Kafka source bounded by calling
> "setUnbounded" ... which is weird, because "setUnbounded" should not make
> something bounded?!
>
> Are there any log messages from the Source that can give us any hints?
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#boundedness
>
> On Wed, Sep 22, 2021 at 5:37 PM Robert Cullen <ci...@gmail.com>
> wrote:
>
>> I have an unbounded kafka source that has records written to it every
>> second.  Instead of the job waiting to process the new messages it closes.
>> How do I keep the stream open?
>>
>> KafkaSource<FluentdMessage> dataSource = KafkaSource
>>         .<FluentdMessage>builder()
>>         .setBootstrapServers(kafkaServer)
>>         .setTopics(Arrays.asList("fluentd"))
>>         .setGroupId("")
>>         .setDeserializer(new FluentdRecordDeserializer())
>>         //.setStartingOffsets(OffsetsInitializer.earliest())
>>         //.setBounded(OffsetsInitializer.latest())
>>         .setUnbounded(OffsetsInitializer.latest())
>>         .build();
>>
>>
>>
>>
>> --
>> Robert Cullen
>> 240-475-4490
>>
>

-- 
Robert Cullen
240-475-4490

RE: Unbounded Kafka Source

Posted by Schwalbe Matthias <Ma...@viseca.ch>.
Hi,

If I remember right, this is actually the intended behaviour:

In batch mode: .setBounded(…)
In streaming mode: source that finishes anyway at set offset: use .setUnbounded(…)
In streaming mode: source that never finishes: don’t set a final offset (don’t .setUnbounded(…))

I might be mistaken …

Thias


From: Robert Metzger <rm...@apache.org>
Sent: Mittwoch, 22. September 2021 17:51
To: Robert Cullen <ci...@gmail.com>
Cc: user <us...@flink.apache.org>
Subject: Re: Unbounded Kafka Source

Hi,

What happens if you do not set any boundedness on the KafkaSource?
For a DataStream job in streaming mode, the Kafka source should be unbounded.

From reading the code, it seems that setting unbounded(latest) should not trigger the behavior you mention ... but the Flink docs are not clearly written [1], as it says that you can make a Kafka source bounded by calling "setUnbounded" ... which is weird, because "setUnbounded" should not make something bounded?!

Are there any log messages from the Source that can give us any hints?

[1] https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#boundedness

On Wed, Sep 22, 2021 at 5:37 PM Robert Cullen <ci...@gmail.com>> wrote:
I have an unbounded kafka source that has records written to it every second.  Instead of the job waiting to process the new messages it closes.  How do I keep the stream open?


KafkaSource<FluentdMessage> dataSource = KafkaSource
        .<FluentdMessage>builder()
        .setBootstrapServers(kafkaServer)
        .setTopics(Arrays.asList("fluentd"))
        .setGroupId("")
        .setDeserializer(new FluentdRecordDeserializer())
        //.setStartingOffsets(OffsetsInitializer.earliest())
        //.setBounded(OffsetsInitializer.latest())
        .setUnbounded(OffsetsInitializer.latest())
        .build();



--
Robert Cullen
240-475-4490
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten.

This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.

Re: Unbounded Kafka Source

Posted by Robert Metzger <rm...@apache.org>.
Hi,

What happens if you do not set any boundedness on the KafkaSource?
For a DataStream job in streaming mode, the Kafka source should be
unbounded.

From reading the code, it seems that setting unbounded(latest) should not
trigger the behavior you mention ... but the Flink docs are not clearly
written [1], as it says that you can make a Kafka source bounded by calling
"setUnbounded" ... which is weird, because "setUnbounded" should not make
something bounded?!

Are there any log messages from the Source that can give us any hints?

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#boundedness

On Wed, Sep 22, 2021 at 5:37 PM Robert Cullen <ci...@gmail.com> wrote:

> I have an unbounded kafka source that has records written to it every
> second.  Instead of the job waiting to process the new messages it closes.
> How do I keep the stream open?
>
> KafkaSource<FluentdMessage> dataSource = KafkaSource
>         .<FluentdMessage>builder()
>         .setBootstrapServers(kafkaServer)
>         .setTopics(Arrays.asList("fluentd"))
>         .setGroupId("")
>         .setDeserializer(new FluentdRecordDeserializer())
>         //.setStartingOffsets(OffsetsInitializer.earliest())
>         //.setBounded(OffsetsInitializer.latest())
>         .setUnbounded(OffsetsInitializer.latest())
>         .build();
>
>
>
>
> --
> Robert Cullen
> 240-475-4490
>