You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by op <52...@qq.com> on 2020/09/03 07:03:33 UTC

FlinkKafkaConsumer问题

&nbsp; &nbsp; hi,&nbsp; &nbsp; 我对FlinkKafkaConsumer的实现有点迷惑,&nbsp; &nbsp; 这有两个相同代码的程序:
//---------------------------
 val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
 Env.setRestartStrategy(RestartStrategies.noRestart())
 val consumerProps = new Properties()
 consumerProps.put("bootstrap.servers", brokers)
 consumerProps.put("group.id", "test1234")

 val consumer = new FlinkKafkaConsumer[String](topic,new KafkaStringSchema,consumerProps).setStartFromLatest()
 Env.addSource(consumer).print()
 Env.execute()//-----------------------------------我同时启动这两个程序,他们连接相同的集群的topic,group.id也一样,然后我向topic发送一些数据,发现这两个程序都能消费到发送的所有分区的消息,kafka 的consumer group组内应该是有消费隔离的,为什么这里两个程序都能同时消费到全部数据呢?而用KafkaConsumer写两个相同的程序去消费这个topic就可以看到两边程序是没有重复消费同一分区的。我用的是flink1.11flink-connector-kafka_2.11 谢谢

Re: FlinkKafkaConsumer问题

Posted by 赵一旦 <hi...@gmail.com>.
这个问题和flink关系不大。Kafka本身就是这么个特点,指定group,如果是订阅方式,会是你想象的那样,分享消息。但,如果是通过assign方式指定了消费哪个分区,则不受到group中消费者共享消息的限制。

SmileSmile <a5...@163.com> 于2020年9月5日周六 下午4:51写道:

> hi,这种现象是在开checkpoint才出现的吗,还是没有开启也会?
>
> Best
>
>
>
>
> | |
> a511955993
> |
> |
> 邮箱:a511955993@163.com
> |
>
> 签名由 网易邮箱大师 定制
>
> 在2020年09月04日 14:11,op 写道:
> 大概懂了 感谢
>
>
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:
>                                                   "user-zh"
>                                                                     <
> taochanglian@163.com&gt;;
> 发送时间:&nbsp;2020年9月4日(星期五) 中午11:54
> 收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;"Shuiqiang Chen"<
> acqua.csq@gmail.com&gt;;
>
> 主题:&nbsp;Re: FlinkKafkaConsumer问题
>
>
>
>
> 为了保证exactly-once,flink自己通过barrier来实现checkpoint,包括barrier的传递等等,所以flink在kafkaconsumer的基础之上,封装了一层语义保障。
>
> 在 2020/9/4 10:34, Shuiqiang Chen 写道:
>
> &gt; Hi,
> &gt; 为了保证 Flink 程序的 exactly-once,必须由各个 Kafka source&nbsp; 算子维护当前算子所消费的
> partition 消费 offset 信息,并在每次checkpoint 时将这些信息写入到 state 中, 在从 checkpoint
> 恢复中从上次 commit 的位点开始消费,保证 exactly-once.&nbsp; 如果用 Kafka 消费组管理,那么
> FlinkKafkaConsumer 内各个并发实例所分配的 partition 将由 Kafka 的消费组管理,且 offset 也由 Kafka
> 消费组管理者记录,Flink 无法维护这些信息。
> &gt;
> &gt;&gt; 在 2020年9月4日,上午10:25,op <520075694@qq.com&gt; 写道:
> &gt;&gt;
> &gt;&gt;
> 谢谢,但我还是不太明白,FlinkKafkaConsumer底层不是用的KafkaConsumer吗,为什么flink不用kafka的消费组管理呢?&amp;nbsp;
> &gt;&gt;
> &gt;&gt;
> &gt;&gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
> &gt;&gt;
> 发件人:&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> "user-zh"&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> <acqua.csq@gmail.com&amp;gt;;
> &gt;&gt; 发送时间:&amp;nbsp;2020年9月3日(星期四) 晚上6:09
> &gt;&gt; 收件人:&amp;nbsp;"user-zh"<user-zh@flink.apache.org&amp;gt;;
> &gt;&gt;
> &gt;&gt; 主题:&amp;nbsp;Re: FlinkKafkaConsumer问题
> &gt;&gt;
> &gt;&gt;
> &gt;&gt;
> &gt;&gt; Hi op,
> &gt;&gt;
> &gt;&gt; 在 Flink 消费 Kafka 的过程中, 由 FlinkKafkaConsumer 会从 Kafka 中拿到当前 topic
> 的所有 partition 信息并分配给个并发消费,这里的 group id 只是用于将当前 partition 的消费 offset commit
> 到 Kafka,并用这个消费组标识。而使用 KafkaConsumer 消费数据则应用到了 Kafka 的消费组管理, 这是 Kafka
> 服务端的一个角色。
> &gt;&gt;
> &gt;&gt; 另外,当启动两个作业用同一个 topic 和 group id 消费 kafka, 如果两个作业会分别以同一个 group id
> commit offset 到kafka, 如果以 group offset 消费模式启动作业, 则会以最后一次 commit 的 offset
> 开始消费。
> &gt;&gt;
> &gt;&gt; &amp;gt; 在 2020年9月3日,下午3:03,op <520075694@qq.com&amp;gt; 写道:
> &gt;&gt; &amp;gt;
> &gt;&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; hi,&amp;amp;nbsp;
> &amp;amp;nbsp; 我对FlinkKafkaConsumer的实现有点迷惑,&amp;amp;nbsp; &amp;amp;nbsp;
> 这有两个相同代码的程序:
> &gt;&gt; &amp;gt; //---------------------------
> &gt;&gt; &amp;gt; val bsEnv =
> StreamExecutionEnvironment.getExecutionEnvironment
> &gt;&gt; &amp;gt; Env.setRestartStrategy(RestartStrategies.noRestart())
> &gt;&gt; &amp;gt; val consumerProps = new Properties()
> &gt;&gt; &amp;gt; consumerProps.put("bootstrap.servers", brokers)
> &gt;&gt; &amp;gt; consumerProps.put("group.id", "test1234")
> &gt;&gt; &amp;gt;
> &gt;&gt; &amp;gt; val consumer = new FlinkKafkaConsumer[String](topic,new
> KafkaStringSchema,consumerProps).setStartFromLatest()
> &gt;&gt; &amp;gt; Env.addSource(consumer).print()
> &gt;&gt; &amp;gt;
> Env.execute()//-----------------------------------我同时启动这两个程序,他们连接相同的集群的topic,
> group.id也一样,然后我向topic发送一些数据,发现这两个程序都能消费到发送的所有分区的消息,kafka 的consumer
> group组内应该是有消费隔离的,为什么这里两个程序都能同时消费到全部数据呢?而用KafkaConsumer写两个相同的程序去消费这个topic就可以看到两边程序是没有重复消费同一分区的。我用的是flink1.11flink-connector-kafka_2.11
> 谢谢

回复:FlinkKafkaConsumer问题

Posted by SmileSmile <a5...@163.com>.
hi,这种现象是在开checkpoint才出现的吗,还是没有开启也会?

Best




| |
a511955993
|
|
邮箱:a511955993@163.com
|

签名由 网易邮箱大师 定制

在2020年09月04日 14:11,op 写道:
大概懂了 感谢




------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <taochanglian@163.com&gt;;
发送时间:&nbsp;2020年9月4日(星期五) 中午11:54
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;"Shuiqiang Chen"<acqua.csq@gmail.com&gt;;

主题:&nbsp;Re: FlinkKafkaConsumer问题



为了保证exactly-once,flink自己通过barrier来实现checkpoint,包括barrier的传递等等,所以flink在kafkaconsumer的基础之上,封装了一层语义保障。

在 2020/9/4 10:34, Shuiqiang Chen 写道:

&gt; Hi,
&gt; 为了保证 Flink 程序的 exactly-once,必须由各个 Kafka source&nbsp; 算子维护当前算子所消费的 partition 消费 offset 信息,并在每次checkpoint 时将这些信息写入到 state 中, 在从 checkpoint 恢复中从上次 commit 的位点开始消费,保证 exactly-once.&nbsp; 如果用 Kafka 消费组管理,那么 FlinkKafkaConsumer 内各个并发实例所分配的 partition 将由 Kafka 的消费组管理,且 offset 也由 Kafka 消费组管理者记录,Flink 无法维护这些信息。
&gt;
&gt;&gt; 在 2020年9月4日,上午10:25,op <520075694@qq.com&gt; 写道:
&gt;&gt;
&gt;&gt; 谢谢,但我还是不太明白,FlinkKafkaConsumer底层不是用的KafkaConsumer吗,为什么flink不用kafka的消费组管理呢?&amp;nbsp;
&gt;&gt;
&gt;&gt;
&gt;&gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
&gt;&gt; 发件人:&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "user-zh"&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <acqua.csq@gmail.com&amp;gt;;
&gt;&gt; 发送时间:&amp;nbsp;2020年9月3日(星期四) 晚上6:09
&gt;&gt; 收件人:&amp;nbsp;"user-zh"<user-zh@flink.apache.org&amp;gt;;
&gt;&gt;
&gt;&gt; 主题:&amp;nbsp;Re: FlinkKafkaConsumer问题
&gt;&gt;
&gt;&gt;
&gt;&gt;
&gt;&gt; Hi op,
&gt;&gt;
&gt;&gt; 在 Flink 消费 Kafka 的过程中, 由 FlinkKafkaConsumer 会从 Kafka 中拿到当前 topic 的所有 partition 信息并分配给个并发消费,这里的 group id 只是用于将当前 partition 的消费 offset commit 到 Kafka,并用这个消费组标识。而使用 KafkaConsumer 消费数据则应用到了 Kafka 的消费组管理, 这是 Kafka 服务端的一个角色。
&gt;&gt;
&gt;&gt; 另外,当启动两个作业用同一个 topic 和 group id 消费 kafka, 如果两个作业会分别以同一个 group id commit offset 到kafka, 如果以 group offset 消费模式启动作业, 则会以最后一次 commit 的 offset 开始消费。
&gt;&gt;
&gt;&gt; &amp;gt; 在 2020年9月3日,下午3:03,op <520075694@qq.com&amp;gt; 写道:
&gt;&gt; &amp;gt;
&gt;&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; hi,&amp;amp;nbsp; &amp;amp;nbsp; 我对FlinkKafkaConsumer的实现有点迷惑,&amp;amp;nbsp; &amp;amp;nbsp; 这有两个相同代码的程序:
&gt;&gt; &amp;gt; //---------------------------
&gt;&gt; &amp;gt; val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
&gt;&gt; &amp;gt; Env.setRestartStrategy(RestartStrategies.noRestart())
&gt;&gt; &amp;gt; val consumerProps = new Properties()
&gt;&gt; &amp;gt; consumerProps.put("bootstrap.servers", brokers)
&gt;&gt; &amp;gt; consumerProps.put("group.id", "test1234")
&gt;&gt; &amp;gt;
&gt;&gt; &amp;gt; val consumer = new FlinkKafkaConsumer[String](topic,new KafkaStringSchema,consumerProps).setStartFromLatest()
&gt;&gt; &amp;gt; Env.addSource(consumer).print()
&gt;&gt; &amp;gt; Env.execute()//-----------------------------------我同时启动这两个程序,他们连接相同的集群的topic,group.id也一样,然后我向topic发送一些数据,发现这两个程序都能消费到发送的所有分区的消息,kafka 的consumer group组内应该是有消费隔离的,为什么这里两个程序都能同时消费到全部数据呢?而用KafkaConsumer写两个相同的程序去消费这个topic就可以看到两边程序是没有重复消费同一分区的。我用的是flink1.11flink-connector-kafka_2.11 谢谢

回复: FlinkKafkaConsumer问题

Posted by op <52...@qq.com>.
大概懂了 感谢




------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <taochanglian@163.com&gt;;
发送时间:&nbsp;2020年9月4日(星期五) 中午11:54
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;"Shuiqiang Chen"<acqua.csq@gmail.com&gt;;

主题:&nbsp;Re: FlinkKafkaConsumer问题



为了保证exactly-once,flink自己通过barrier来实现checkpoint,包括barrier的传递等等,所以flink在kafkaconsumer的基础之上,封装了一层语义保障。

在 2020/9/4 10:34, Shuiqiang Chen 写道:

&gt; Hi,
&gt; 为了保证 Flink 程序的 exactly-once,必须由各个 Kafka source&nbsp; 算子维护当前算子所消费的 partition 消费 offset 信息,并在每次checkpoint 时将这些信息写入到 state 中, 在从 checkpoint 恢复中从上次 commit 的位点开始消费,保证 exactly-once.&nbsp; 如果用 Kafka 消费组管理,那么 FlinkKafkaConsumer 内各个并发实例所分配的 partition 将由 Kafka 的消费组管理,且 offset 也由 Kafka 消费组管理者记录,Flink 无法维护这些信息。
&gt;
&gt;&gt; 在 2020年9月4日,上午10:25,op <520075694@qq.com&gt; 写道:
&gt;&gt;
&gt;&gt; 谢谢,但我还是不太明白,FlinkKafkaConsumer底层不是用的KafkaConsumer吗,为什么flink不用kafka的消费组管理呢?&amp;nbsp;
&gt;&gt;
&gt;&gt;
&gt;&gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
&gt;&gt; 发件人:&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "user-zh"&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <acqua.csq@gmail.com&amp;gt;;
&gt;&gt; 发送时间:&amp;nbsp;2020年9月3日(星期四) 晚上6:09
&gt;&gt; 收件人:&amp;nbsp;"user-zh"<user-zh@flink.apache.org&amp;gt;;
&gt;&gt;
&gt;&gt; 主题:&amp;nbsp;Re: FlinkKafkaConsumer问题
&gt;&gt;
&gt;&gt;
&gt;&gt;
&gt;&gt; Hi op,
&gt;&gt;
&gt;&gt; 在 Flink 消费 Kafka 的过程中, 由 FlinkKafkaConsumer 会从 Kafka 中拿到当前 topic 的所有 partition 信息并分配给个并发消费,这里的 group id 只是用于将当前 partition 的消费 offset commit 到 Kafka,并用这个消费组标识。而使用 KafkaConsumer 消费数据则应用到了 Kafka 的消费组管理, 这是 Kafka 服务端的一个角色。
&gt;&gt;
&gt;&gt; 另外,当启动两个作业用同一个 topic 和 group id 消费 kafka, 如果两个作业会分别以同一个 group id commit offset 到kafka, 如果以 group offset 消费模式启动作业, 则会以最后一次 commit 的 offset 开始消费。
&gt;&gt;
&gt;&gt; &amp;gt; 在 2020年9月3日,下午3:03,op <520075694@qq.com&amp;gt; 写道:
&gt;&gt; &amp;gt;
&gt;&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; hi,&amp;amp;nbsp; &amp;amp;nbsp; 我对FlinkKafkaConsumer的实现有点迷惑,&amp;amp;nbsp; &amp;amp;nbsp; 这有两个相同代码的程序:
&gt;&gt; &amp;gt; //---------------------------
&gt;&gt; &amp;gt; val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
&gt;&gt; &amp;gt; Env.setRestartStrategy(RestartStrategies.noRestart())
&gt;&gt; &amp;gt; val consumerProps = new Properties()
&gt;&gt; &amp;gt; consumerProps.put("bootstrap.servers", brokers)
&gt;&gt; &amp;gt; consumerProps.put("group.id", "test1234")
&gt;&gt; &amp;gt;
&gt;&gt; &amp;gt; val consumer = new FlinkKafkaConsumer[String](topic,new KafkaStringSchema,consumerProps).setStartFromLatest()
&gt;&gt; &amp;gt; Env.addSource(consumer).print()
&gt;&gt; &amp;gt; Env.execute()//-----------------------------------我同时启动这两个程序,他们连接相同的集群的topic,group.id也一样,然后我向topic发送一些数据,发现这两个程序都能消费到发送的所有分区的消息,kafka 的consumer group组内应该是有消费隔离的,为什么这里两个程序都能同时消费到全部数据呢?而用KafkaConsumer写两个相同的程序去消费这个topic就可以看到两边程序是没有重复消费同一分区的。我用的是flink1.11flink-connector-kafka_2.11 谢谢

Re: FlinkKafkaConsumer问题

Posted by taochanglian <ta...@163.com>.
为了保证exactly-once,flink自己通过barrier来实现checkpoint,包括barrier的传递等等,所以flink在kafkaconsumer的基础之上,封装了一层语义保障。

在 2020/9/4 10:34, Shuiqiang Chen 写道:

> Hi,
> 为了保证 Flink 程序的 exactly-once,必须由各个 Kafka source  算子维护当前算子所消费的 partition 消费 offset 信息,并在每次checkpoint 时将这些信息写入到 state 中, 在从 checkpoint 恢复中从上次 commit 的位点开始消费,保证 exactly-once.  如果用 Kafka 消费组管理,那么 FlinkKafkaConsumer 内各个并发实例所分配的 partition 将由 Kafka 的消费组管理,且 offset 也由 Kafka 消费组管理者记录,Flink 无法维护这些信息。
>
>> 在 2020年9月4日,上午10:25,op <52...@qq.com> 写道:
>>
>> 谢谢,但我还是不太明白,FlinkKafkaConsumer底层不是用的KafkaConsumer吗,为什么flink不用kafka的消费组管理呢?&nbsp;
>>
>>
>> ------------------&nbsp;原始邮件&nbsp;------------------
>> 发件人:                                                                                                                        "user-zh"                                                                                    <acqua.csq@gmail.com&gt;;
>> 发送时间:&nbsp;2020年9月3日(星期四) 晚上6:09
>> 收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
>>
>> 主题:&nbsp;Re: FlinkKafkaConsumer问题
>>
>>
>>
>> Hi op,
>>
>> 在 Flink 消费 Kafka 的过程中, 由 FlinkKafkaConsumer 会从 Kafka 中拿到当前 topic 的所有 partition 信息并分配给个并发消费,这里的 group id 只是用于将当前 partition 的消费 offset commit 到 Kafka,并用这个消费组标识。而使用 KafkaConsumer 消费数据则应用到了 Kafka 的消费组管理, 这是 Kafka 服务端的一个角色。
>>
>> 另外,当启动两个作业用同一个 topic 和 group id 消费 kafka, 如果两个作业会分别以同一个 group id commit offset 到kafka, 如果以 group offset 消费模式启动作业, 则会以最后一次 commit 的 offset 开始消费。
>>
>> &gt; 在 2020年9月3日,下午3:03,op <520075694@qq.com&gt; 写道:
>> &gt;
>> &gt; &amp;nbsp; &amp;nbsp; hi,&amp;nbsp; &amp;nbsp; 我对FlinkKafkaConsumer的实现有点迷惑,&amp;nbsp; &amp;nbsp; 这有两个相同代码的程序:
>> &gt; //---------------------------
>> &gt; val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
>> &gt; Env.setRestartStrategy(RestartStrategies.noRestart())
>> &gt; val consumerProps = new Properties()
>> &gt; consumerProps.put("bootstrap.servers", brokers)
>> &gt; consumerProps.put("group.id", "test1234")
>> &gt;
>> &gt; val consumer = new FlinkKafkaConsumer[String](topic,new KafkaStringSchema,consumerProps).setStartFromLatest()
>> &gt; Env.addSource(consumer).print()
>> &gt; Env.execute()//-----------------------------------我同时启动这两个程序,他们连接相同的集群的topic,group.id也一样,然后我向topic发送一些数据,发现这两个程序都能消费到发送的所有分区的消息,kafka 的consumer group组内应该是有消费隔离的,为什么这里两个程序都能同时消费到全部数据呢?而用KafkaConsumer写两个相同的程序去消费这个topic就可以看到两边程序是没有重复消费同一分区的。我用的是flink1.11flink-connector-kafka_2.11 谢谢


Re: FlinkKafkaConsumer问题

Posted by Shuiqiang Chen <ac...@gmail.com>.
Hi,
为了保证 Flink 程序的 exactly-once,必须由各个 Kafka source  算子维护当前算子所消费的 partition 消费 offset 信息,并在每次checkpoint 时将这些信息写入到 state 中, 在从 checkpoint 恢复中从上次 commit 的位点开始消费,保证 exactly-once.  如果用 Kafka 消费组管理,那么 FlinkKafkaConsumer 内各个并发实例所分配的 partition 将由 Kafka 的消费组管理,且 offset 也由 Kafka 消费组管理者记录,Flink 无法维护这些信息。

> 在 2020年9月4日,上午10:25,op <52...@qq.com> 写道:
> 
> 谢谢,但我还是不太明白,FlinkKafkaConsumer底层不是用的KafkaConsumer吗,为什么flink不用kafka的消费组管理呢?&nbsp;
> 
> 
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:                                                                                                                        "user-zh"                                                                                    <acqua.csq@gmail.com&gt;;
> 发送时间:&nbsp;2020年9月3日(星期四) 晚上6:09
> 收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
> 
> 主题:&nbsp;Re: FlinkKafkaConsumer问题
> 
> 
> 
> Hi op,
> 
> 在 Flink 消费 Kafka 的过程中, 由 FlinkKafkaConsumer 会从 Kafka 中拿到当前 topic 的所有 partition 信息并分配给个并发消费,这里的 group id 只是用于将当前 partition 的消费 offset commit 到 Kafka,并用这个消费组标识。而使用 KafkaConsumer 消费数据则应用到了 Kafka 的消费组管理, 这是 Kafka 服务端的一个角色。
> 
> 另外,当启动两个作业用同一个 topic 和 group id 消费 kafka, 如果两个作业会分别以同一个 group id commit offset 到kafka, 如果以 group offset 消费模式启动作业, 则会以最后一次 commit 的 offset 开始消费。
> 
> &gt; 在 2020年9月3日,下午3:03,op <520075694@qq.com&gt; 写道:
> &gt; 
> &gt; &amp;nbsp; &amp;nbsp; hi,&amp;nbsp; &amp;nbsp; 我对FlinkKafkaConsumer的实现有点迷惑,&amp;nbsp; &amp;nbsp; 这有两个相同代码的程序:
> &gt; //---------------------------
> &gt; val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
> &gt; Env.setRestartStrategy(RestartStrategies.noRestart())
> &gt; val consumerProps = new Properties()
> &gt; consumerProps.put("bootstrap.servers", brokers)
> &gt; consumerProps.put("group.id", "test1234")
> &gt; 
> &gt; val consumer = new FlinkKafkaConsumer[String](topic,new KafkaStringSchema,consumerProps).setStartFromLatest()
> &gt; Env.addSource(consumer).print()
> &gt; Env.execute()//-----------------------------------我同时启动这两个程序,他们连接相同的集群的topic,group.id也一样,然后我向topic发送一些数据,发现这两个程序都能消费到发送的所有分区的消息,kafka 的consumer group组内应该是有消费隔离的,为什么这里两个程序都能同时消费到全部数据呢?而用KafkaConsumer写两个相同的程序去消费这个topic就可以看到两边程序是没有重复消费同一分区的。我用的是flink1.11flink-connector-kafka_2.11 谢谢


Re: FlinkKafkaConsumer问题

Posted by lec ssmi <sh...@gmail.com>.
是flink对kafka的消费,是自己管理offset,用low-level api去寻址,而不是用group.id来管理offset这种high-level
api。是这个意思吧。

op <52...@qq.com> 于2020年9月4日周五 上午10:25写道:

>
> 谢谢,但我还是不太明白,FlinkKafkaConsumer底层不是用的KafkaConsumer吗,为什么flink不用kafka的消费组管理呢?&nbsp;
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:
>                                                   "user-zh"
>                                                                     <
> acqua.csq@gmail.com&gt;;
> 发送时间:&nbsp;2020年9月3日(星期四) 晚上6:09
> 收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
>
> 主题:&nbsp;Re: FlinkKafkaConsumer问题
>
>
>
> Hi op,
>
> 在 Flink 消费 Kafka 的过程中, 由 FlinkKafkaConsumer 会从 Kafka 中拿到当前 topic 的所有
> partition 信息并分配给个并发消费,这里的 group id 只是用于将当前 partition 的消费 offset commit 到
> Kafka,并用这个消费组标识。而使用 KafkaConsumer 消费数据则应用到了 Kafka 的消费组管理, 这是 Kafka 服务端的一个角色。
>
> 另外,当启动两个作业用同一个 topic 和 group id 消费 kafka, 如果两个作业会分别以同一个 group id commit
> offset 到kafka, 如果以 group offset 消费模式启动作业, 则会以最后一次 commit 的 offset 开始消费。
>
> &gt; 在 2020年9月3日,下午3:03,op <520075694@qq.com&gt; 写道:
> &gt;
> &gt; &amp;nbsp; &amp;nbsp; hi,&amp;nbsp; &amp;nbsp;
> 我对FlinkKafkaConsumer的实现有点迷惑,&amp;nbsp; &amp;nbsp; 这有两个相同代码的程序:
> &gt; //---------------------------
> &gt; val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
> &gt; Env.setRestartStrategy(RestartStrategies.noRestart())
> &gt; val consumerProps = new Properties()
> &gt; consumerProps.put("bootstrap.servers", brokers)
> &gt; consumerProps.put("group.id", "test1234")
> &gt;
> &gt; val consumer = new FlinkKafkaConsumer[String](topic,new
> KafkaStringSchema,consumerProps).setStartFromLatest()
> &gt; Env.addSource(consumer).print()
> &gt;
> Env.execute()//-----------------------------------我同时启动这两个程序,他们连接相同的集群的topic,
> group.id也一样,然后我向topic发送一些数据,发现这两个程序都能消费到发送的所有分区的消息,kafka 的consumer
> group组内应该是有消费隔离的,为什么这里两个程序都能同时消费到全部数据呢?而用KafkaConsumer写两个相同的程序去消费这个topic就可以看到两边程序是没有重复消费同一分区的。我用的是flink1.11flink-connector-kafka_2.11
> 谢谢

回复: FlinkKafkaConsumer问题

Posted by op <52...@qq.com>.
谢谢,但我还是不太明白,FlinkKafkaConsumer底层不是用的KafkaConsumer吗,为什么flink不用kafka的消费组管理呢?&nbsp;


------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <acqua.csq@gmail.com&gt;;
发送时间:&nbsp;2020年9月3日(星期四) 晚上6:09
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

主题:&nbsp;Re: FlinkKafkaConsumer问题



Hi op,

在 Flink 消费 Kafka 的过程中, 由 FlinkKafkaConsumer 会从 Kafka 中拿到当前 topic 的所有 partition 信息并分配给个并发消费,这里的 group id 只是用于将当前 partition 的消费 offset commit 到 Kafka,并用这个消费组标识。而使用 KafkaConsumer 消费数据则应用到了 Kafka 的消费组管理, 这是 Kafka 服务端的一个角色。

另外,当启动两个作业用同一个 topic 和 group id 消费 kafka, 如果两个作业会分别以同一个 group id commit offset 到kafka, 如果以 group offset 消费模式启动作业, 则会以最后一次 commit 的 offset 开始消费。

&gt; 在 2020年9月3日,下午3:03,op <520075694@qq.com&gt; 写道:
&gt; 
&gt; &amp;nbsp; &amp;nbsp; hi,&amp;nbsp; &amp;nbsp; 我对FlinkKafkaConsumer的实现有点迷惑,&amp;nbsp; &amp;nbsp; 这有两个相同代码的程序:
&gt; //---------------------------
&gt; val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
&gt; Env.setRestartStrategy(RestartStrategies.noRestart())
&gt; val consumerProps = new Properties()
&gt; consumerProps.put("bootstrap.servers", brokers)
&gt; consumerProps.put("group.id", "test1234")
&gt; 
&gt; val consumer = new FlinkKafkaConsumer[String](topic,new KafkaStringSchema,consumerProps).setStartFromLatest()
&gt; Env.addSource(consumer).print()
&gt; Env.execute()//-----------------------------------我同时启动这两个程序,他们连接相同的集群的topic,group.id也一样,然后我向topic发送一些数据,发现这两个程序都能消费到发送的所有分区的消息,kafka 的consumer group组内应该是有消费隔离的,为什么这里两个程序都能同时消费到全部数据呢?而用KafkaConsumer写两个相同的程序去消费这个topic就可以看到两边程序是没有重复消费同一分区的。我用的是flink1.11flink-connector-kafka_2.11 谢谢

Re: FlinkKafkaConsumer问题

Posted by Shuiqiang Chen <ac...@gmail.com>.
Hi op,

在 Flink 消费 Kafka 的过程中, 由 FlinkKafkaConsumer 会从 Kafka 中拿到当前 topic 的所有 partition 信息并分配给个并发消费,这里的 group id 只是用于将当前 partition 的消费 offset commit 到 Kafka,并用这个消费组标识。而使用 KafkaConsumer 消费数据则应用到了 Kafka 的消费组管理, 这是 Kafka 服务端的一个角色。

另外,当启动两个作业用同一个 topic 和 group id 消费 kafka, 如果两个作业会分别以同一个 group id commit offset 到kafka, 如果以 group offset 消费模式启动作业, 则会以最后一次 commit 的 offset 开始消费。

> 在 2020年9月3日,下午3:03,op <52...@qq.com> 写道:
> 
> &nbsp; &nbsp; hi,&nbsp; &nbsp; 我对FlinkKafkaConsumer的实现有点迷惑,&nbsp; &nbsp; 这有两个相同代码的程序:
> //---------------------------
> val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
> Env.setRestartStrategy(RestartStrategies.noRestart())
> val consumerProps = new Properties()
> consumerProps.put("bootstrap.servers", brokers)
> consumerProps.put("group.id", "test1234")
> 
> val consumer = new FlinkKafkaConsumer[String](topic,new KafkaStringSchema,consumerProps).setStartFromLatest()
> Env.addSource(consumer).print()
> Env.execute()//-----------------------------------我同时启动这两个程序,他们连接相同的集群的topic,group.id也一样,然后我向topic发送一些数据,发现这两个程序都能消费到发送的所有分区的消息,kafka 的consumer group组内应该是有消费隔离的,为什么这里两个程序都能同时消费到全部数据呢?而用KafkaConsumer写两个相同的程序去消费这个topic就可以看到两边程序是没有重复消费同一分区的。我用的是flink1.11flink-connector-kafka_2.11 谢谢