You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by op <52...@qq.com> on 2020/07/09 03:25:27 UTC

kafka connector问题

官网给的kafka table配置里的scan.startup.mode&nbsp;CREATE TABLE kafkaTable ( user_id BIGINT,  item_id BIGINT,  category_id BIGINT,  behavior STRING,  ts TIMESTAMP(3) ) WITH (  'connector' = 'kafka',  'topic' = 'user_behavior',  'properties.bootstrap.servers' = 'localhost:9092',  'properties.group.id' = 'testGroup',  'format' = 'csv',  'scan.startup.mode' = 'earliest-offset' )看了总共有以下几总'earliest-offset',&nbsp;'latest-offset',&nbsp;'group-offsets',&nbsp;'timestamp'&nbsp;and&nbsp;'specific-offsets'如果我作业重启的话选择group-offsets能否从上次消费到的位置开始?这种情况下需要配置提交offset到kafka broker相关的东西吗?有没有从savepoint保存的offset继续消费的配置?

Re: kafka connector问题

Posted by Benchao Li <li...@apache.org>.
首先,从checkpoint/savepoint
恢复的话,一定会以checkpoint/savepoint中的offset为准,所以它的优先级是最高的,
不管你配置哪种startup mode。
如果你没有开启checkpoint,那么如果你用了group-offsets,那它就会从保存在kafka中的offset进行启动。
提交offset到kafka这个应该是默认就开了的。

op <52...@qq.com> 于2020年7月9日周四 上午11:25写道:

> 官网给的kafka table配置里的scan.startup.mode&nbsp;CREATE TABLE kafkaTable (
> user_id BIGINT,  item_id BIGINT,  category_id BIGINT,  behavior STRING,  ts
> TIMESTAMP(3) ) WITH (  'connector' = 'kafka',  'topic' = 'user_behavior',
> 'properties.bootstrap.servers' = 'localhost:9092',  'properties.group.id'
> = 'testGroup',  'format' = 'csv',  'scan.startup.mode' = 'earliest-offset'
> )看了总共有以下几总'earliest-offset',&nbsp;'latest-offset',&nbsp;'group-offsets',&nbsp;'timestamp'&nbsp;and&nbsp;'specific-offsets'如果我作业重启的话选择group-offsets能否从上次消费到的位置开始?这种情况下需要配置提交offset到kafka
> broker相关的东西吗?有没有从savepoint保存的offset继续消费的配置?



-- 

Best,
Benchao Li