You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Ahmet Gürbüz (Jira)" <ji...@apache.org> on 2022/04/22 02:52:00 UTC

[jira] [Updated] (FLINK-27348) Flink KafkaSource doesn't set groupId

     [ https://issues.apache.org/jira/browse/FLINK-27348?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Ahmet Gürbüz updated FLINK-27348:
---------------------------------
    Description: 
I have one very simple Flink application. I have installed kafka in my local and I am reading data from kafka with flink. I am using KafkaSource class in Flink. Although I have assigned GroupId with setGroupId, this groupId does not appear in Kafka.

 
{code:java}
object FlinkKafkaSource extends App {
  val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
  case class Event(partitionNo:Long, eventTime:String, eventTimestamp:Long, userId:String, firstName:String)
  implicit val readsEvent: Reads[Event] = Json.reads[Event]

  env
    .fromSource(KafkaSource.builder[Event]
      .setBootstrapServers("localhost:9092")
      .setTopics("flink-connection")
      .setGroupId("test-group") // I can't see this groupId in kafka-consumer-groups
      .setStartingOffsets(OffsetsInitializer.latest)
      .setDeserializer(new KafkaRecordDeserializationSchema[Event] {
        override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]], out: Collector[Event]): Unit = {
          val rec = record.value.map(_.toChar).mkString
          Try(Json.fromJson[Event](Json.parse(rec)).get) match {
            case Success(event) => out.collect(event)
            case Failure(exception) => println(s"Couldn't parse string: $rec, error: ${exception.toString}")
          }
        }
        override def getProducedType: TypeInformation[Event] = createTypeInformation[Event]
      })
      .build,
      WatermarkStrategy.noWatermarks[Event],
      "kafka-source"
    )
    .keyBy(l => l.userId)
    .print

  env.execute("flink-kafka-source")
} {code}
I have created a topic in kafka named "flink-connection".

 

I am using a simple kafka-python producer to produce data flink-connection topic.

!image-2022-04-22-05-43-06-475.png|width=1259,height=266!

I am able to consume data from kafka.

!image-2022-04-22-05-44-56-494.png!

But can't see the groupId in kafka-consumer-groups

!image-2022-04-22-05-46-45-592.png!

 

 

  was:
I have one very simple Flink application. I have installed kafka in my local and I am reading data from kafka with flink. I am using KafkaSource class in Flink. Although I have assigned GroupId with setGroupId, this groupId does not appear in Kafka.

 
{code:java}
object FlinkKafkaSource extends App {
  val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
  case class Event(partitionNo:Long, eventTime:String, eventTimestamp:Long, userId:String, firstName:String)
  implicit val readsEvent: Reads[Event] = Json.reads[Event]

  env
    .fromSource(KafkaSource.builder[Event]
      .setBootstrapServers("localhost:9092")
      .setTopics("flink-connection")
      .setGroupId("test-group") // I can't see this groupId in kafka-consumer-groups
      .setStartingOffsets(OffsetsInitializer.latest)
      .setDeserializer(new KafkaRecordDeserializationSchema[Event] {
        override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]], out: Collector[Event]): Unit = {
          val rec = record.value.map(_.toChar).mkString
          Try(Json.fromJson[Event](Json.parse(rec)).get) match {
            case Success(event) => out.collect(event)
            case Failure(exception) => println(s"Couldn't parse string: $rec, error: ${exception.toString}")
          }
        }
        override def getProducedType: TypeInformation[Event] = createTypeInformation[Event]
      })
      .build,
      WatermarkStrategy.noWatermarks[Event],
      "kafka-source"
    )
    .keyBy(l => l.userId)
    .print

  env.execute("flink-kafka-source")
} {code}
I have created a topic in kafka named "flink-connection".

 

I am using a simple kafka-python producer to produce data flink-connection topic.

!image-2022-04-22-05-43-06-475.png|width=762,height=161!

I am able to consume data from kafka.

!image-2022-04-22-05-44-56-494.png!

But can't see the groupId in kafka-consumer-groups

!image-2022-04-22-05-46-45-592.png!

 

 


> Flink KafkaSource doesn't set groupId
> -------------------------------------
>
>                 Key: FLINK-27348
>                 URL: https://issues.apache.org/jira/browse/FLINK-27348
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Scala
>    Affects Versions: 1.14.4
>         Environment: OS: windows 8.1.
> Java version:
> java version "11.0.13" 2021-10-19 LTS
> Java(TM) SE Runtime Environment 18.9 (build 11.0.13+10-LTS-370)
> Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.13+10-LTS-370, mixed mode)
>  
>  
>            Reporter: Ahmet Gürbüz
>            Priority: Major
>         Attachments: image-2022-04-22-05-43-06-475.png, image-2022-04-22-05-44-56-494.png, image-2022-04-22-05-46-45-592.png
>
>
> I have one very simple Flink application. I have installed kafka in my local and I am reading data from kafka with flink. I am using KafkaSource class in Flink. Although I have assigned GroupId with setGroupId, this groupId does not appear in Kafka.
>  
> {code:java}
> object FlinkKafkaSource extends App {
>   val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
>   case class Event(partitionNo:Long, eventTime:String, eventTimestamp:Long, userId:String, firstName:String)
>   implicit val readsEvent: Reads[Event] = Json.reads[Event]
>   env
>     .fromSource(KafkaSource.builder[Event]
>       .setBootstrapServers("localhost:9092")
>       .setTopics("flink-connection")
>       .setGroupId("test-group") // I can't see this groupId in kafka-consumer-groups
>       .setStartingOffsets(OffsetsInitializer.latest)
>       .setDeserializer(new KafkaRecordDeserializationSchema[Event] {
>         override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]], out: Collector[Event]): Unit = {
>           val rec = record.value.map(_.toChar).mkString
>           Try(Json.fromJson[Event](Json.parse(rec)).get) match {
>             case Success(event) => out.collect(event)
>             case Failure(exception) => println(s"Couldn't parse string: $rec, error: ${exception.toString}")
>           }
>         }
>         override def getProducedType: TypeInformation[Event] = createTypeInformation[Event]
>       })
>       .build,
>       WatermarkStrategy.noWatermarks[Event],
>       "kafka-source"
>     )
>     .keyBy(l => l.userId)
>     .print
>   env.execute("flink-kafka-source")
> } {code}
> I have created a topic in kafka named "flink-connection".
>  
> I am using a simple kafka-python producer to produce data flink-connection topic.
> !image-2022-04-22-05-43-06-475.png|width=1259,height=266!
> I am able to consume data from kafka.
> !image-2022-04-22-05-44-56-494.png!
> But can't see the groupId in kafka-consumer-groups
> !image-2022-04-22-05-46-45-592.png!
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)