You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Luffy Tsai (JIRA)" <ji...@apache.org> on 2017/09/25 00:06:00 UTC
[jira] [Comment Edited] (FLINK-4004) Do not pass custom flink kafka
connector properties to Kafka to avoid warnings
[ https://issues.apache.org/jira/browse/FLINK-4004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16178409#comment-16178409 ]
Luffy Tsai edited comment on FLINK-4004 at 9/25/17 12:05 AM:
-------------------------------------------------------------
Hi,
Before I started working, I thought the solution of the issue was defined a filter function {{filterUnusedProperties}} and then just changed the line {{this.kafkaProperties = checkNotNull(kafkaProperties);}} in the [KafkaConsumerThread.java#L132|https://github.com/apache/flink/blob/4afca4b3a13b61c2754bc839c77ba4d4eb1d2da2/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java#L132], and finally the warning log would disappear.
However, I have had some problems.
There is a lot of room for improvement in the following solutions of getting properties of consumer config.
h6. 1. Get config from Kafka ConsumerConfig.
Unfortunately, the public method {{configNames}} which was added from v0.10.1. It couldn't be used in the flink-connector-kafka-0.8 and flink-connector-kafka-0.9.
[ConsumerConfig#L417|https://github.com/apache/kafka/blob/2b663790733527488d7f33ebc47f383ff18b5a83/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java#L417]
Maybe we could get all public fields by the following code, but it's not a appropriate design pattern.
{{Field[] fields = ConsumerConfig.class.getFields();}}
h6. 2. Maintain consumer config in the Flink.
It sounds workable but dirty.
Could you give me some suggestions?
Thank you!
was (Author: paristsai):
Hi,
Before I started working, I thought the solution of the issue was defined a filter function {{filterUnusedProperties}} and then just changed the line {{this.kafkaProperties = checkNotNull(kafkaProperties);}} in the [KafkaConsumerThread.java#L132|https://github.com/apache/flink/blob/4afca4b3a13b61c2754bc839c77ba4d4eb1d2da2/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java#L132], and finally the warning log would disappear.
However, I have had some problems. The following methods are not good enough.
h6. 1. Get config from Kafka ConsumerConfig.
Unfortunately, the public method {{configNames}} which was added from v0.10.1. It couldn't be used in the flink-connector-kafka-0.8 and flink-connector-kafka-0.9.
[ConsumerConfig#L417|https://github.com/apache/kafka/blob/2b663790733527488d7f33ebc47f383ff18b5a83/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java#L417]
Maybe we could get all public fields by the following code, but it's not a appropriate design pattern.
{{Field[] fields = ConsumerConfig.class.getFields();}}
h6. 2. Maintain consumer config in the Flink.
It sounds workable but dirty.
Could you give me some suggestions?
Thank you!
> Do not pass custom flink kafka connector properties to Kafka to avoid warnings
> ------------------------------------------------------------------------------
>
> Key: FLINK-4004
> URL: https://issues.apache.org/jira/browse/FLINK-4004
> Project: Flink
> Issue Type: Improvement
> Components: Kafka Connector
> Reporter: Robert Metzger
> Assignee: Luffy Tsai
>
> The FlinkKafkaConsumer has some custom properties, which we pass to the KafkaConsumer as well (such as {{flink.poll-timeout}}). This causes Kafka to log warnings about unused properties.
> We should not pass Flink-internal properties to Kafka, to avoid those warnings.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)