You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by wang guanglei <gl...@outlook.com> on 2021/08/23 02:06:34 UTC

Flink-SQL 消费Confluent Kafka 反序列化失败

同行们好,
最近遇到一个使用FlinkSQL无法连接(消费)Confluent Kafka的问题:

Fink 版本:1.12-csadh1.3.0.0

集群平台:Cloudera(CDP)

Kafka:Confluent Kafka

现象:使用如下Flink - SQL 连接(消费)kafka数据(反序列化)失败:

SQL语句如下:
[cid:a8372b16-f6a7-48c0-b19c-d85bf38c1163]
错误日志如下:
java.io.IOException: Could not find schema with id 79 in registry at org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.readSchema(ConfluentSchemaRegistryCoder.java:77) at org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:70) at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:98) ... 9 more
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unauthorized; error code: 401 at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:292) at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:352) at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:660) at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:642) at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:217) at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaBySubjectAndId(CachedSchemaRegistryClient.java:291) at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaById(CachedSchemaRegistryClient.java:276) at io.confluent.kafka.schemaregistry.client.SchemaRegistryClient.getById(SchemaRegistryClient.java:64) at org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.readSchema(ConfluentSchemaRegistryCoder.java:74) ... 11 more

开始以为是参数的使用方式不对,查看了官网的Confluent Avro Format<https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/formats/avro-confluent.html#avro-confluent-basic-auth-credentials-source>部分:
[cid:7a216800-f8ba-4285-959a-3efdbe1f9531]
将With语句中author相关参数的properties前缀去掉:
[cid:2950843b-2263-49c2-8d8e-62e4d1793010]
会直接报错:
org.apache.flink.table.api.ValidationException: Unsupported options found for connector 'kafka'.
Unsupported options:
avro-confluent.basic-auth.credentials-source avro-confluent.basic-auth.user-info

补充:

同样的参数,使用DataStream API 是能消费的数据的,Kafka中的topic别的同学一直在使用,Kafka中的数据应该没问题。