You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Vishwas Siravara <vs...@gmail.com> on 2019/09/09 14:48:42 UTC

Using FlinkKafkaConsumer API

I am using flink-kafka-connector and this is my dependency

"org.apache.flink" %% "flink-connector-kafka" % *"1.7.0"*,


Whe I look at my dependency tree the kafka client version is

 -org.apache.kafka:kafka-clients:2.0.1 which comes from the above package.


However when I run my code in the cluster I see that the kafka-client that
is loaded is

 0.10.2.0


Here is the task executor log :

2019-09-09 03:05:56,825 INFO
org.apache.kafka.common.utils.AppInfoParser                   - Kafka
version : 0.10.2.0

I am struggling to find out where this dependency is coming from. Our
broker version is not

compatible with this client. How can I force flink to use 2.0.1.


Also the API I use for Kafka Consumer is

 private[flink] def sourceType: FlinkKafkaConsumer[GenericRecord] = {
    val consumer = new FlinkKafkaConsumer[GenericRecord](
      source.asJava,
      AvroDeserialization.genericRecd,
      ExecutionEnv.streamProperties)
    consumer
  }

}


I really appreciate help. Is there any way I can find out where this
dependency comes from in the cluster as this is clearly not coming
form my application.



Thanks,

Vishwas