You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by op <52...@qq.com> on 2020/09/02 07:42:23 UTC
FlinkKafkaConsumer problem
hi, i am confused about consumer group of FlinkKafkaConsumer, i have two applications,with the same code like this:
//---------------------------
val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
Env.setRestartStrategy(RestartStrategies.noRestart())
val consumerProps = new Properties()
consumerProps.put("bootstrap.servers", brokers)
consumerProps.put("group.id", "test1234")
val consumer = new FlinkKafkaConsumer[String](topic,new KafkaStringSchema,consumerProps).setStartFromLatest()
Env.addSource(consumer).print()
Env.execute()//-----------------------------------then i launch both,they have the same topic and group.id,and when i send some message to the topic,i find both application consume all the data ,which does‘t behave as kafka consumer group,can someone tell me why?
Re: FlinkKafkaConsumer problem
Posted by Till Rohrmann <tr...@apache.org>.
The reason two Flink jobs using a Kafka consumer with the same consumer
group are seeing the same events is that Flink's FlinkKafkaConsumer does
not participate in Kafka's consumer group management. Instead Flink
manually assigns all partitions to the source operators (on a per job
basis). The consumer group will only be used to commit the current
offset to the Kafka brokers.
Cheers,
Till
On Wed, Sep 2, 2020 at 9:42 AM op <52...@qq.com> wrote:
> hi,
> i am confused about consumer group of FlinkKafkaConsumer,
> i have two applications,with the same code like this:
> //---------------------------
>
> val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
> Env.setRestartStrategy(RestartStrategies.noRestart())
> val consumerProps = new Properties()
> consumerProps.put("bootstrap.servers", brokers)
> consumerProps.put("group.id", "test1234")
>
> val consumer = new FlinkKafkaConsumer[String](topic,new KafkaStringSchema,consumerProps).setStartFromLatest()
> Env.addSource(consumer).print()
> Env.execute()
>
> //-----------------------------------
>
> then i launch both,they have the same topic and group.id,and when i send some message to the topic,
>
> i find both application consume all the data ,which does‘t behave as kafka consumer group,
>
> can someone tell me why?
>
>