You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by yidan zhao <hi...@gmail.com> on 2021/11/11 07:22:22 UTC

flink new source api, kafka部分对kafka-client的版本要求。

如题,当前新的kafaksouce貌似对kafka-client版本做了依赖,比如代码KafkaPartitionSplitReader.acquireAndSetStoppingOffsets方法中用到的

consumer.committed(partitionsStoppingAtCommitted)
        .forEach(
                (tp, offsetAndMetadata) -> {
                    Preconditions.checkNotNull(
                            offsetAndMetadata,
                            String.format(
                                    "Partition %s should stop at
committed offset. "
                                            + "But there is no
committed offset of this partition for group %s",
                                    tp, groupId));
                    stoppingOffsets.put(tp, offsetAndMetadata.offset());
                });

在kafka-client2.2.0中是不符合语法的。committed方法签名不同。

Re: flink new source api, kafka部分对kafka-client的版本要求。

Posted by Shengkai Fang <fs...@gmail.com>.
我看pom 之中之前依赖的kafka的版本是 2.4.1,当前依赖的 kafka 版本是 2.8.1。应该是不支持低版本的 kafka。


Best,
Shengkai



yidan zhao <hi...@gmail.com> 于2021年11月11日周四 下午3:22写道:

>
> 如题,当前新的kafaksouce貌似对kafka-client版本做了依赖,比如代码KafkaPartitionSplitReader.acquireAndSetStoppingOffsets方法中用到的
>
> consumer.committed(partitionsStoppingAtCommitted)
>         .forEach(
>                 (tp, offsetAndMetadata) -> {
>                     Preconditions.checkNotNull(
>                             offsetAndMetadata,
>                             String.format(
>                                     "Partition %s should stop at
> committed offset. "
>                                             + "But there is no
> committed offset of this partition for group %s",
>                                     tp, groupId));
>                     stoppingOffsets.put(tp, offsetAndMetadata.offset());
>                 });
>
> 在kafka-client2.2.0中是不符合语法的。committed方法签名不同。
>