You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by yidan zhao <hi...@gmail.com> on 2021/11/11 07:22:22 UTC
flink new source api, kafka部分对kafka-client的版本要求。
如题,当前新的kafaksouce貌似对kafka-client版本做了依赖,比如代码KafkaPartitionSplitReader.acquireAndSetStoppingOffsets方法中用到的
consumer.committed(partitionsStoppingAtCommitted)
.forEach(
(tp, offsetAndMetadata) -> {
Preconditions.checkNotNull(
offsetAndMetadata,
String.format(
"Partition %s should stop at
committed offset. "
+ "But there is no
committed offset of this partition for group %s",
tp, groupId));
stoppingOffsets.put(tp, offsetAndMetadata.offset());
});
在kafka-client2.2.0中是不符合语法的。committed方法签名不同。
Re: flink new source api, kafka部分对kafka-client的版本要求。
Posted by Shengkai Fang <fs...@gmail.com>.
我看pom 之中之前依赖的kafka的版本是 2.4.1,当前依赖的 kafka 版本是 2.8.1。应该是不支持低版本的 kafka。
Best,
Shengkai
yidan zhao <hi...@gmail.com> 于2021年11月11日周四 下午3:22写道:
>
> 如题,当前新的kafaksouce貌似对kafka-client版本做了依赖,比如代码KafkaPartitionSplitReader.acquireAndSetStoppingOffsets方法中用到的
>
> consumer.committed(partitionsStoppingAtCommitted)
> .forEach(
> (tp, offsetAndMetadata) -> {
> Preconditions.checkNotNull(
> offsetAndMetadata,
> String.format(
> "Partition %s should stop at
> committed offset. "
> + "But there is no
> committed offset of this partition for group %s",
> tp, groupId));
> stoppingOffsets.put(tp, offsetAndMetadata.offset());
> });
>
> 在kafka-client2.2.0中是不符合语法的。committed方法签名不同。
>