You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Ahmet Gürbüz (Jira)" <ji...@apache.org> on 2022/04/22 02:49:00 UTC
[jira] [Created] (FLINK-27348) Flink KafkaSource doesn't set groupId
Ahmet Gürbüz created FLINK-27348:
------------------------------------
Summary: Flink KafkaSource doesn't set groupId
Key: FLINK-27348
URL: https://issues.apache.org/jira/browse/FLINK-27348
Project: Flink
Issue Type: Bug
Components: API / Scala
Affects Versions: 1.14.4
Environment: OS: windows 8.1.
Java version:
java version "11.0.13" 2021-10-19 LTS
Java(TM) SE Runtime Environment 18.9 (build 11.0.13+10-LTS-370)
Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.13+10-LTS-370, mixed mode)
Reporter: Ahmet Gürbüz
Attachments: image-2022-04-22-05-43-06-475.png, image-2022-04-22-05-44-56-494.png, image-2022-04-22-05-46-45-592.png
I have one very simple Flink application. I have installed kafka in my local and I am reading data from kafka with flink. I am using KafkaSource class in Flink. Although I have assigned GroupId with setGroupId, this groupId does not appear in Kafka.
{code:java}
object FlinkKafkaSource extends App {
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
case class Event(partitionNo:Long, eventTime:String, eventTimestamp:Long, userId:String, firstName:String)
implicit val readsEvent: Reads[Event] = Json.reads[Event]
env
.fromSource(KafkaSource.builder[Event]
.setBootstrapServers("localhost:9092")
.setTopics("flink-connection")
.setGroupId("test-group") // I can't see this groupId in kafka-consumer-groups
.setStartingOffsets(OffsetsInitializer.latest)
.setDeserializer(new KafkaRecordDeserializationSchema[Event] {
override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]], out: Collector[Event]): Unit = {
val rec = record.value.map(_.toChar).mkString
Try(Json.fromJson[Event](Json.parse(rec)).get) match {
case Success(event) => out.collect(event)
case Failure(exception) => println(s"Couldn't parse string: $rec, error: ${exception.toString}")
}
}
override def getProducedType: TypeInformation[Event] = createTypeInformation[Event]
})
.build,
WatermarkStrategy.noWatermarks[Event],
"kafka-source"
)
.keyBy(l => l.userId)
.print
env.execute("flink-kafka-source")
} {code}
I have created a topic in kafka named "flink-connection".
I am using a simple kafka-python producer to produce data flink-connection topic.
!image-2022-04-22-05-43-06-475.png|width=762,height=161!
I am able to consume data from kafka.
!image-2022-04-22-05-44-56-494.png!
But can't see the groupId in kafka-consumer-groups
!image-2022-04-22-05-46-45-592.png!
--
This message was sent by Atlassian Jira
(v8.20.7#820007)