You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by op <52...@qq.com> on 2020/09/02 07:42:23 UTC

FlinkKafkaConsumer problem

&nbsp; &nbsp; hi,&nbsp; &nbsp; i am confused about consumer group of FlinkKafkaConsumer,&nbsp;&nbsp; &nbsp; i have two applications,with the same code like this:
//---------------------------
 val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
 Env.setRestartStrategy(RestartStrategies.noRestart())
 val consumerProps = new Properties()
 consumerProps.put("bootstrap.servers", brokers)
 consumerProps.put("group.id", "test1234")

 val consumer = new FlinkKafkaConsumer[String](topic,new KafkaStringSchema,consumerProps).setStartFromLatest()
 Env.addSource(consumer).print()
 Env.execute()//-----------------------------------then i launch both,they have the same topic and  group.id,and when i send some message to the topic,i find both application consume all the data ,which does‘t behave as kafka consumer group,can someone tell me why?

Re: FlinkKafkaConsumer problem

Posted by Till Rohrmann <tr...@apache.org>.
The reason two Flink jobs using a Kafka consumer with the same consumer
group are seeing the same events is that Flink's FlinkKafkaConsumer does
not participate in Kafka's consumer group management. Instead Flink
manually assigns all partitions to the source operators (on a per job
basis). The consumer group will only be used to commit the current
offset to the Kafka brokers.

Cheers,
Till

On Wed, Sep 2, 2020 at 9:42 AM op <52...@qq.com> wrote:

>     hi,
>     i am confused about consumer group of FlinkKafkaConsumer,
>     i have two applications,with the same code like this:
> //---------------------------
>
>  val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
>  Env.setRestartStrategy(RestartStrategies.noRestart())
>  val consumerProps = new Properties()
>  consumerProps.put("bootstrap.servers", brokers)
>  consumerProps.put("group.id", "test1234")
>
>  val consumer = new FlinkKafkaConsumer[String](topic,new KafkaStringSchema,consumerProps).setStartFromLatest()
>  Env.addSource(consumer).print()
>  Env.execute()
>
> //-----------------------------------
>
> then i launch both,they have the same topic and  group.id,and when i send some message to the topic,
>
> i find both application consume all the data ,which does‘t behave as kafka consumer group,
>
> can someone tell me why?
>
>