You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by HG <ha...@gmail.com> on 2022/02/01 07:20:20 UTC
KafkaSource vs FlinkKafkaConsumer010
Hello all
I am confused.
What is the difference between KafkaSource as defined in :
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/
and FlinkKafkaConsumer010 as defined in
https://nightlies.apache.org/flink/flink-docs-release-
1.2/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.html
<https://nightlies.apache.org/flink/flink-docs-release-1.2/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.html>
When should I use which?
Regards Hans
Re: KafkaSource vs FlinkKafkaConsumer010
Posted by David Anderson <da...@apache.org>.
Before Kafka introduced their universal client, Flink had version-specific
connectors, e.g., for versions 0.8, 0.9, 0.10, and 0.11. Those were
eventually removed in favor of FlinkKafkaConsumer, which is/was backward
compatible back to Kafka version 0.10.
FlinkKafkaConsumer itself was deprecated in Flink 1.14 in favor of
KafkaSource, which implements the unified batch/streaming interface defined
in FLIP-27.
Regards,
David
On Tue, Feb 1, 2022 at 9:21 AM Francesco Guardiani <fr...@ververica.com>
wrote:
> I think the FlinkKakfaConsumer010 you're talking about is the old source
> api. You should use only KafkaSource now, as they use the new source
> infrastructure.
>
> On Tue, Feb 1, 2022 at 9:02 AM HG <ha...@gmail.com> wrote:
>
>> Hello Francesco
>> Perhaps I copied the wrong link of 1.2.
>> But there is also
>> https://nightlies.apache.org/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.html
>>
>> It seems there are 2 ways to use Kafka
>>
>> KafkaSource<String> source = KafkaSource.<String>builder()
>> .setBootstrapServers(brokers)
>> .setTopics("input-topic")
>> .setGroupId("my-group")
>> .setStartingOffsets(OffsetsInitializer.earliest())
>> .setValueOnlyDeserializer(new SimpleStringSchema())
>> .build();
>>
>> And like this:
>>
>> Properties kafkaProperties = new Properties();
>> kafkaProperties.put("bootstrap.servers",kafkaBootstrapServers);
>> kafkaProperties.put("group.id",kafkaGroupID);
>> kafkaProperties.put("auto.offset.reset",kafkaAutoOffsetReset);
>> FlinkKafkaConsumer010<String> kafkaConsumer = new FlinkKafkaConsumer010<>(kafkaTopic, new SimpleStringSchema(), kafkaProperties);
>> kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
>>
>>
>> There is even a FlinkKafkaConsumer011
>>
>> Which one is preferable ? Or have they different use cases?
>>
>> Regards Hans
>>
>>
>> Op di 1 feb. 2022 om 08:55 schreef Francesco Guardiani <
>> francesco@ververica.com>:
>>
>>> The latter link you posted refers to a very old flink release. You shold
>>> use the first link, which refers to latest release
>>>
>>> FG
>>>
>>> On Tue, Feb 1, 2022 at 8:20 AM HG <ha...@gmail.com> wrote:
>>>
>>>> Hello all
>>>>
>>>> I am confused.
>>>> What is the difference between KafkaSource as defined in :
>>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/
>>>> and FlinkKafkaConsumer010 as defined in
>>>> https://nightlies.apache.org/flink/flink-docs-release-
>>>> 1.2/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.html
>>>> <https://nightlies.apache.org/flink/flink-docs-release-1.2/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.html>
>>>>
>>>> When should I use which?
>>>>
>>>> Regards Hans
>>>>
>>>
Re: KafkaSource vs FlinkKafkaConsumer010
Posted by Francesco Guardiani <fr...@ververica.com>.
I think the FlinkKakfaConsumer010 you're talking about is the old source
api. You should use only KafkaSource now, as they use the new source
infrastructure.
On Tue, Feb 1, 2022 at 9:02 AM HG <ha...@gmail.com> wrote:
> Hello Francesco
> Perhaps I copied the wrong link of 1.2.
> But there is also
> https://nightlies.apache.org/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.html
>
> It seems there are 2 ways to use Kafka
>
> KafkaSource<String> source = KafkaSource.<String>builder()
> .setBootstrapServers(brokers)
> .setTopics("input-topic")
> .setGroupId("my-group")
> .setStartingOffsets(OffsetsInitializer.earliest())
> .setValueOnlyDeserializer(new SimpleStringSchema())
> .build();
>
> And like this:
>
> Properties kafkaProperties = new Properties();
> kafkaProperties.put("bootstrap.servers",kafkaBootstrapServers);
> kafkaProperties.put("group.id",kafkaGroupID);
> kafkaProperties.put("auto.offset.reset",kafkaAutoOffsetReset);
> FlinkKafkaConsumer010<String> kafkaConsumer = new FlinkKafkaConsumer010<>(kafkaTopic, new SimpleStringSchema(), kafkaProperties);
> kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
>
>
> There is even a FlinkKafkaConsumer011
>
> Which one is preferable ? Or have they different use cases?
>
> Regards Hans
>
>
> Op di 1 feb. 2022 om 08:55 schreef Francesco Guardiani <
> francesco@ververica.com>:
>
>> The latter link you posted refers to a very old flink release. You shold
>> use the first link, which refers to latest release
>>
>> FG
>>
>> On Tue, Feb 1, 2022 at 8:20 AM HG <ha...@gmail.com> wrote:
>>
>>> Hello all
>>>
>>> I am confused.
>>> What is the difference between KafkaSource as defined in :
>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/
>>> and FlinkKafkaConsumer010 as defined in
>>> https://nightlies.apache.org/flink/flink-docs-release-
>>> 1.2/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.html
>>> <https://nightlies.apache.org/flink/flink-docs-release-1.2/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.html>
>>>
>>> When should I use which?
>>>
>>> Regards Hans
>>>
>>
Re: KafkaSource vs FlinkKafkaConsumer010
Posted by HG <ha...@gmail.com>.
Hello Francesco
Perhaps I copied the wrong link of 1.2.
But there is also
https://nightlies.apache.org/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.html
It seems there are 2 ways to use Kafka
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(brokers)
.setTopics("input-topic")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
And like this:
Properties kafkaProperties = new Properties();
kafkaProperties.put("bootstrap.servers",kafkaBootstrapServers);
kafkaProperties.put("group.id",kafkaGroupID);
kafkaProperties.put("auto.offset.reset",kafkaAutoOffsetReset);
FlinkKafkaConsumer010<String> kafkaConsumer = new
FlinkKafkaConsumer010<>(kafkaTopic, new SimpleStringSchema(),
kafkaProperties);
kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
There is even a FlinkKafkaConsumer011
Which one is preferable ? Or have they different use cases?
Regards Hans
Op di 1 feb. 2022 om 08:55 schreef Francesco Guardiani <
francesco@ververica.com>:
> The latter link you posted refers to a very old flink release. You shold
> use the first link, which refers to latest release
>
> FG
>
> On Tue, Feb 1, 2022 at 8:20 AM HG <ha...@gmail.com> wrote:
>
>> Hello all
>>
>> I am confused.
>> What is the difference between KafkaSource as defined in :
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/
>> and FlinkKafkaConsumer010 as defined in
>> https://nightlies.apache.org/flink/flink-docs-release-
>> 1.2/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.html
>> <https://nightlies.apache.org/flink/flink-docs-release-1.2/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.html>
>>
>> When should I use which?
>>
>> Regards Hans
>>
>
Re: KafkaSource vs FlinkKafkaConsumer010
Posted by Francesco Guardiani <fr...@ververica.com>.
The latter link you posted refers to a very old flink release. You shold
use the first link, which refers to latest release
FG
On Tue, Feb 1, 2022 at 8:20 AM HG <ha...@gmail.com> wrote:
> Hello all
>
> I am confused.
> What is the difference between KafkaSource as defined in :
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/
> and FlinkKafkaConsumer010 as defined in
> https://nightlies.apache.org/flink/flink-docs-release-
> 1.2/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.html
> <https://nightlies.apache.org/flink/flink-docs-release-1.2/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.html>
>
> When should I use which?
>
> Regards Hans
>