You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by wang guanglei <gl...@outlook.com> on 2021/06/16 09:21:28 UTC
Flink Table API 消费Kafka时设置format异常
背景:
source: kafka
flink 版本:1.10
avro版本:1.10.0
代码:
bsTableEnv.connect(new Kafka()
.version("universal")
.topic(params.get("read-topic"))
.startFromEarliest()
.properties(this.properties)
)
.withFormat(
new Avro().recordClass(User.class) // 别人定义的User
)
.withSchema(schema)
// User 的继承关系:
import org.apache.avro.specific.SpecificRecordBase;
public class User extends SpecificRecordBase implements ***
错误日志:
Caused by: org.apache.flink.table.api.ValidationException: Could not get class 'com.***.User' for key 'format.record-class'. at org.apache.flink.table.descriptors.DescriptorProperties.lambda$getOptionalClass$4(DescriptorProperties.java:389) at java.util.Optional.map(Optional.java:215) at org.apache.flink.table.descriptors.DescriptorProperties.getOptionalClass(DescriptorProperties.java:378) at org.apache.flink.table.descriptors.DescriptorProperties.getClass(DescriptorProperties.java:398) at org.apache.flink.formats.avro.AvroRowFormatFactory.createDeserializationSchema(AvroRowFormatFactory.java:62) at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getDeserializationSchema(KafkaTableSourceSinkFactoryBase.java:281) at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:161) at org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:49) at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:53) ... 38 more
Caused by: org.apache.flink.table.api.ValidationException: Class 'com.***.User' does not extend from the required class 'org.apache.avro.specific.SpecificRecord' for key 'format.record-class'. at org.apache.flink.table.descriptors.DescriptorProperties.lambda$getOptionalClass$4(DescriptorProperties.java:385) ... 46 more
请问这个ValidationException是什么原因导致的呢?是不是因为User继承的SpecificRecord的版本和我本地使用的avro的版本不一致导致的呢?