You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by wang guanglei <gl...@outlook.com> on 2021/06/16 09:21:28 UTC

Flink Table API 消费Kafka时设置format异常

背景:
source: kafka
flink 版本:1.10
avro版本:1.10.0

代码:

bsTableEnv.connect(new Kafka()
                .version("universal")
                .topic(params.get("read-topic"))
                .startFromEarliest()
                .properties(this.properties)
        )
                .withFormat(
                        new ​Avro().recordClass(User.class) // 别人定义的User
                )
                .withSchema(schema)

// User 的继承关系:

import org.apache.avro.specific.SpecificRecordBase;

public class User extends SpecificRecordBase implements ***

错误日志:
Caused by: org.apache.flink.table.api.ValidationException: Could not get class 'com.***.User' for key 'format.record-class'. at org.apache.flink.table.descriptors.DescriptorProperties.lambda$getOptionalClass$4(DescriptorProperties.java:389) at java.util.Optional.map(Optional.java:215) at org.apache.flink.table.descriptors.DescriptorProperties.getOptionalClass(DescriptorProperties.java:378) at org.apache.flink.table.descriptors.DescriptorProperties.getClass(DescriptorProperties.java:398) at org.apache.flink.formats.avro.AvroRowFormatFactory.createDeserializationSchema(AvroRowFormatFactory.java:62) at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getDeserializationSchema(KafkaTableSourceSinkFactoryBase.java:281) at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:161) at org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:49) at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:53) ... 38 more

Caused by: org.apache.flink.table.api.ValidationException: Class 'com.***.User' does not extend from the required class 'org.apache.avro.specific.SpecificRecord' for key 'format.record-class'. at org.apache.flink.table.descriptors.DescriptorProperties.lambda$getOptionalClass$4(DescriptorProperties.java:385) ... 46 more

请问这个ValidationException是什么原因导致的呢?是不是因为User继承的SpecificRecord的版本和我本地使用的avro的版本不一致导致的呢?