You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Leonid Ilyevsky (Jira)" <ji...@apache.org> on 2020/02/20 16:53:00 UTC

[jira] [Commented] (FLINK-12548) FlinkKafkaConsumer issues configuring underlying KafkaConsumer

    [ https://issues.apache.org/jira/browse/FLINK-12548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17041134#comment-17041134 ] 

Leonid Ilyevsky commented on FLINK-12548:
-----------------------------------------

Now testing with Flink version 1.10.0. The "Error registering AppInfo mbean" I don't see now, so this is good, though it was a minor issue.

The bigger issue with client.id is still there. I see in taskmanager log that I properly configure the KafkaClient  : 
{code:java}
2020-02-20 11:37:26.384 [Kafka Fetcher for Source: Custom Source -> Map -> Sink: Unnamed (1/3)] INFO  org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values:
        auto.commit.interval.ms = 5000
        auto.offset.reset = latest
        bootstrap.servers = [va1qvllmsg01:9092, va1qvllmsg02:9092, va1qvllmsg03:9092]
        check.crcs = true
        client.dns.lookup = default
        client.id = cl-parser-phoenix-nj
        connections.max.idle.ms = 540000
......{code}
However, it is not shown in kafka-consumer-groups report. 

For other consumers, like those created by Mirror Maker, the  kafka-consumer-groups utility shows CONSUMER-ID, HOST and CLIENT-ID. For the consumers from Flink, those fields are always empty.

> FlinkKafkaConsumer issues configuring underlying KafkaConsumer
> --------------------------------------------------------------
>
>                 Key: FLINK-12548
>                 URL: https://issues.apache.org/jira/browse/FLINK-12548
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.8.0
>            Reporter: Leonid Ilyevsky
>            Assignee: Hugo Da Cruz Louro
>            Priority: Major
>         Attachments: KafkaError.txt
>
>
> I observe two issues that are possibly related, or at least belong to the same part of the code.
> I am running multiple instances of pipelines (by setting parallelism) under Flink cluster.
> First problem I see is the "Error registering AppInfo mbean", see attached stack trace [^KafkaError.txt] . From the trace I understand that it tries to use consumer group id as the bean name; obviously this fails as I have multiple instances using the same group id under the same task manager.
> Second problem is, the client id is not set at all (I do provide the "client.id" property when invoking FlinkKafkaConsumer constructor). This creates problems tracking consumers on Kafka side. On the other hand, I obviously do not want to set the same client id on all instances - preferably Flink should add maybe some UUID suffix to it or something to make it unique.
> The same unique name could be used for that mbean, this way solving the first problem.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)