You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by HG <ha...@gmail.com> on 2022/02/01 07:20:20 UTC

KafkaSource vs FlinkKafkaConsumer010

Hello all

I am confused.
What is the difference between KafkaSource as defined in :
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/
and FlinkKafkaConsumer010 as defined in
https://nightlies.apache.org/flink/flink-docs-release-
1.2/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.html
<https://nightlies.apache.org/flink/flink-docs-release-1.2/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.html>

When should I use which?

Regards Hans

Re: KafkaSource vs FlinkKafkaConsumer010

Posted by David Anderson <da...@apache.org>.
Before Kafka introduced their universal client, Flink had version-specific
connectors, e.g., for versions 0.8, 0.9, 0.10, and 0.11. Those were
eventually removed in favor of FlinkKafkaConsumer, which is/was backward
compatible back to Kafka version 0.10.

FlinkKafkaConsumer itself was deprecated in Flink 1.14 in favor of
KafkaSource, which implements the unified batch/streaming interface defined
in FLIP-27.

Regards,
David

On Tue, Feb 1, 2022 at 9:21 AM Francesco Guardiani <fr...@ververica.com>
wrote:

> I think the FlinkKakfaConsumer010 you're talking about is the old source
> api. You should use only KafkaSource now, as they use the new source
> infrastructure.
>
> On Tue, Feb 1, 2022 at 9:02 AM HG <ha...@gmail.com> wrote:
>
>> Hello Francesco
>> Perhaps I copied the wrong link of 1.2.
>> But there is also
>> https://nightlies.apache.org/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.html
>>
>> It seems there are 2 ways to use Kafka
>>
>> KafkaSource<String> source = KafkaSource.<String>builder()
>>     .setBootstrapServers(brokers)
>>     .setTopics("input-topic")
>>     .setGroupId("my-group")
>>     .setStartingOffsets(OffsetsInitializer.earliest())
>>     .setValueOnlyDeserializer(new SimpleStringSchema())
>>     .build();
>>
>> And like this:
>>
>>         Properties kafkaProperties = new Properties();
>>         kafkaProperties.put("bootstrap.servers",kafkaBootstrapServers);
>>         kafkaProperties.put("group.id",kafkaGroupID);
>>         kafkaProperties.put("auto.offset.reset",kafkaAutoOffsetReset);
>>         FlinkKafkaConsumer010<String> kafkaConsumer = new FlinkKafkaConsumer010<>(kafkaTopic, new SimpleStringSchema(), kafkaProperties);
>>         kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
>>
>>
>> There is even a FlinkKafkaConsumer011
>>
>> Which one is preferable ? Or have they different use cases?
>>
>> Regards Hans
>>
>>
>> Op di 1 feb. 2022 om 08:55 schreef Francesco Guardiani <
>> francesco@ververica.com>:
>>
>>> The latter link you posted refers to a very old flink release. You shold
>>> use the first link, which refers to latest release
>>>
>>> FG
>>>
>>> On Tue, Feb 1, 2022 at 8:20 AM HG <ha...@gmail.com> wrote:
>>>
>>>> Hello all
>>>>
>>>> I am confused.
>>>> What is the difference between KafkaSource as defined in :
>>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/
>>>> and FlinkKafkaConsumer010 as defined in
>>>> https://nightlies.apache.org/flink/flink-docs-release-
>>>> 1.2/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.html
>>>> <https://nightlies.apache.org/flink/flink-docs-release-1.2/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.html>
>>>>
>>>> When should I use which?
>>>>
>>>> Regards Hans
>>>>
>>>

Re: KafkaSource vs FlinkKafkaConsumer010

Posted by Francesco Guardiani <fr...@ververica.com>.
I think the FlinkKakfaConsumer010 you're talking about is the old source
api. You should use only KafkaSource now, as they use the new source
infrastructure.

On Tue, Feb 1, 2022 at 9:02 AM HG <ha...@gmail.com> wrote:

> Hello Francesco
> Perhaps I copied the wrong link of 1.2.
> But there is also
> https://nightlies.apache.org/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.html
>
> It seems there are 2 ways to use Kafka
>
> KafkaSource<String> source = KafkaSource.<String>builder()
>     .setBootstrapServers(brokers)
>     .setTopics("input-topic")
>     .setGroupId("my-group")
>     .setStartingOffsets(OffsetsInitializer.earliest())
>     .setValueOnlyDeserializer(new SimpleStringSchema())
>     .build();
>
> And like this:
>
>         Properties kafkaProperties = new Properties();
>         kafkaProperties.put("bootstrap.servers",kafkaBootstrapServers);
>         kafkaProperties.put("group.id",kafkaGroupID);
>         kafkaProperties.put("auto.offset.reset",kafkaAutoOffsetReset);
>         FlinkKafkaConsumer010<String> kafkaConsumer = new FlinkKafkaConsumer010<>(kafkaTopic, new SimpleStringSchema(), kafkaProperties);
>         kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
>
>
> There is even a FlinkKafkaConsumer011
>
> Which one is preferable ? Or have they different use cases?
>
> Regards Hans
>
>
> Op di 1 feb. 2022 om 08:55 schreef Francesco Guardiani <
> francesco@ververica.com>:
>
>> The latter link you posted refers to a very old flink release. You shold
>> use the first link, which refers to latest release
>>
>> FG
>>
>> On Tue, Feb 1, 2022 at 8:20 AM HG <ha...@gmail.com> wrote:
>>
>>> Hello all
>>>
>>> I am confused.
>>> What is the difference between KafkaSource as defined in :
>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/
>>> and FlinkKafkaConsumer010 as defined in
>>> https://nightlies.apache.org/flink/flink-docs-release-
>>> 1.2/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.html
>>> <https://nightlies.apache.org/flink/flink-docs-release-1.2/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.html>
>>>
>>> When should I use which?
>>>
>>> Regards Hans
>>>
>>

Re: KafkaSource vs FlinkKafkaConsumer010

Posted by HG <ha...@gmail.com>.
Hello Francesco
Perhaps I copied the wrong link of 1.2.
But there is also
https://nightlies.apache.org/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.html

It seems there are 2 ways to use Kafka

KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers(brokers)
    .setTopics("input-topic")
    .setGroupId("my-group")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

And like this:

        Properties kafkaProperties = new Properties();
        kafkaProperties.put("bootstrap.servers",kafkaBootstrapServers);
        kafkaProperties.put("group.id",kafkaGroupID);
        kafkaProperties.put("auto.offset.reset",kafkaAutoOffsetReset);
        FlinkKafkaConsumer010<String> kafkaConsumer = new
FlinkKafkaConsumer010<>(kafkaTopic, new SimpleStringSchema(),
kafkaProperties);
        kafkaConsumer.setCommitOffsetsOnCheckpoints(true);


There is even a FlinkKafkaConsumer011

Which one is preferable ? Or have they different use cases?

Regards Hans


Op di 1 feb. 2022 om 08:55 schreef Francesco Guardiani <
francesco@ververica.com>:

> The latter link you posted refers to a very old flink release. You shold
> use the first link, which refers to latest release
>
> FG
>
> On Tue, Feb 1, 2022 at 8:20 AM HG <ha...@gmail.com> wrote:
>
>> Hello all
>>
>> I am confused.
>> What is the difference between KafkaSource as defined in :
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/
>> and FlinkKafkaConsumer010 as defined in
>> https://nightlies.apache.org/flink/flink-docs-release-
>> 1.2/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.html
>> <https://nightlies.apache.org/flink/flink-docs-release-1.2/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.html>
>>
>> When should I use which?
>>
>> Regards Hans
>>
>

Re: KafkaSource vs FlinkKafkaConsumer010

Posted by Francesco Guardiani <fr...@ververica.com>.
The latter link you posted refers to a very old flink release. You shold
use the first link, which refers to latest release

FG

On Tue, Feb 1, 2022 at 8:20 AM HG <ha...@gmail.com> wrote:

> Hello all
>
> I am confused.
> What is the difference between KafkaSource as defined in :
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/
> and FlinkKafkaConsumer010 as defined in
> https://nightlies.apache.org/flink/flink-docs-release-
> 1.2/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.html
> <https://nightlies.apache.org/flink/flink-docs-release-1.2/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.html>
>
> When should I use which?
>
> Regards Hans
>