You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Mahima Agarwal <ma...@gmail.com> on 2022/01/24 07:35:31 UTC

Query regarding Kafka Source and Kafka Sink in 1.14.3

Hi Team,

I am trying to set the following properties in Kafka Source API in flink
1.14.3 version.
-> client.id.prefix
-> partition.discovery.interval.ms

But I am getting the below mentioned warning in taskmanager logs:

1. WARN  org.apache.kafka.clients.consumer.ConsumerConfig             [] -
The configuration 'client.id.prefix' was supplied but isn't a known config.
2. WARN  org.apache.kafka.clients.consumer.ConsumerConfig             [] -
The configuration 'partition.discovery.interval.ms' was supplied but isn't
a known config.

What could be the reason for this warning?

Also, in flink version 1.13.2 we were able write timestamp to kafka
using setWriteTimestampToKafka(true) method of FlinkKafkaProducer class.
Similar to this how can we write timestamp to kafka using KafkaSink API in
flink 1.14.3?

Any leads would be appreciated.

Thanks and Regards
Mahima Agarwal

Re: Query regarding Kafka Source and Kafka Sink in 1.14.3

Posted by Caizhi Weng <ts...@gmail.com>.
Hi!

All properties you set by calling KafkaSource.builder().setProperty() will
also be given to KafkaConsumer (see [1]). However these two properties are
specific to Flink and Kafka does not know them, so Kafka will produce a
warning message. These messages are harmless as long as the properties you
set are actually effective.

About writing timestamp to Kafka, I'm not familiar with Kafka but from the
code I guess if you create a Kafka record serializer
with KafkaRecordSerializationSchema.builder() then by default it will write
timestamp to Kafka. You can try out the example in [2] and see if it works.

[1]
https://github.com/apache/flink/blob/e615106b38a289bc624a8554b86c83f9785352d3/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L85
[2]
https://github.com/apache/flink/blob/0bc2234b60d1a0635e238d18990695943158123c/flink-end-to-end-tests/flink-streaming-kafka-test/src/main/java/org/apache/flink/streaming/kafka/test/KafkaExample.java

Mahima Agarwal <ma...@gmail.com> 于2022年1月24日周一 15:35写道:

> Hi Team,
>
> I am trying to set the following properties in Kafka Source API in flink
> 1.14.3 version.
> -> client.id.prefix
> -> partition.discovery.interval.ms
>
> But I am getting the below mentioned warning in taskmanager logs:
>
> 1. WARN  org.apache.kafka.clients.consumer.ConsumerConfig             [] -
> The configuration 'client.id.prefix' was supplied but isn't a known config.
> 2. WARN  org.apache.kafka.clients.consumer.ConsumerConfig             [] -
> The configuration 'partition.discovery.interval.ms' was supplied but
> isn't a known config.
>
> What could be the reason for this warning?
>
> Also, in flink version 1.13.2 we were able write timestamp to kafka
> using setWriteTimestampToKafka(true) method of FlinkKafkaProducer class.
> Similar to this how can we write timestamp to kafka using KafkaSink API in
> flink 1.14.3?
>
> Any leads would be appreciated.
>
> Thanks and Regards
> Mahima Agarwal
>