You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Luffy Tsai (JIRA)" <ji...@apache.org> on 2017/09/25 00:06:00 UTC

[jira] [Comment Edited] (FLINK-4004) Do not pass custom flink kafka connector properties to Kafka to avoid warnings

    [ https://issues.apache.org/jira/browse/FLINK-4004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16178409#comment-16178409 ] 

Luffy Tsai edited comment on FLINK-4004 at 9/25/17 12:05 AM:
-------------------------------------------------------------

Hi,


Before I started working, I thought the solution of the issue was defined a filter function {{filterUnusedProperties}} and then just changed the line {{this.kafkaProperties = checkNotNull(kafkaProperties);}} in the [KafkaConsumerThread.java#L132|https://github.com/apache/flink/blob/4afca4b3a13b61c2754bc839c77ba4d4eb1d2da2/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java#L132], and finally the warning log would disappear.

However, I have had some problems.
There is a lot of room for improvement in the following solutions of getting properties of consumer config.

h6. 1. Get config from Kafka ConsumerConfig.

Unfortunately, the public method {{configNames}} which was added from v0.10.1. It couldn't be used in the flink-connector-kafka-0.8 and flink-connector-kafka-0.9.
[ConsumerConfig#L417|https://github.com/apache/kafka/blob/2b663790733527488d7f33ebc47f383ff18b5a83/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java#L417]

Maybe we could get all public fields by the following code, but it's not a appropriate design pattern.
{{Field[] fields = ConsumerConfig.class.getFields();}}

h6. 2. Maintain consumer config in the Flink.

It sounds workable but dirty.


Could you give me some suggestions?
Thank you!


was (Author: paristsai):
Hi,


Before I started working, I thought the solution of the issue was defined a filter function {{filterUnusedProperties}} and then just changed the line {{this.kafkaProperties = checkNotNull(kafkaProperties);}} in the [KafkaConsumerThread.java#L132|https://github.com/apache/flink/blob/4afca4b3a13b61c2754bc839c77ba4d4eb1d2da2/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java#L132], and finally the warning log would disappear.

However, I have had some problems. The following methods are not good enough.

h6. 1. Get config from Kafka ConsumerConfig.

Unfortunately, the public method {{configNames}} which was added from v0.10.1. It couldn't be used in the flink-connector-kafka-0.8 and flink-connector-kafka-0.9.
[ConsumerConfig#L417|https://github.com/apache/kafka/blob/2b663790733527488d7f33ebc47f383ff18b5a83/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java#L417]

Maybe we could get all public fields by the following code, but it's not a appropriate design pattern.
{{Field[] fields = ConsumerConfig.class.getFields();}}

h6. 2. Maintain consumer config in the Flink.

It sounds workable but dirty.


Could you give me some suggestions?
Thank you!

> Do not pass custom flink kafka connector properties to Kafka to avoid warnings
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-4004
>                 URL: https://issues.apache.org/jira/browse/FLINK-4004
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kafka Connector
>            Reporter: Robert Metzger
>            Assignee: Luffy Tsai
>
> The FlinkKafkaConsumer has some custom properties, which we pass to the KafkaConsumer as well (such as {{flink.poll-timeout}}). This causes Kafka to log warnings about unused properties.
> We should not pass Flink-internal properties to Kafka, to avoid those warnings.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)