You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (Jira)" <ji...@apache.org> on 2022/03/31 20:53:00 UTC

[jira] [Commented] (SPARK-38715) Would be nice to be able to configure a client ID pattern in Kafka integration

    [ https://issues.apache.org/jira/browse/SPARK-38715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17515587#comment-17515587 ] 

Apache Spark commented on SPARK-38715:
--------------------------------------

User 'cchantep' has created a pull request for this issue:
https://github.com/apache/spark/pull/36030

> Would be nice to be able to configure a client ID pattern in Kafka integration
> ------------------------------------------------------------------------------
>
>                 Key: SPARK-38715
>                 URL: https://issues.apache.org/jira/browse/SPARK-38715
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 3.0.0
>            Reporter: Cédric Chantepie
>            Priority: Major
>
> By default Kafka client automatically generated a unique client ID.
> Client ID is used by many data lineage tool to gather consumer/producer (for consumer the consumer group is also used, but only client ID can be used for producer).
> Setting the [client.id](https://kafka.apache.org/documentation/#producerconfigs_client.id) is options passed to Spark Kafka read or write is not possible, as it would force the same client.id on at east both the driver and the executor.
> What could be done is to be able to passed Spark specific option, maybe named `clientIdPrefix`.
> e.g.
> ```scala
> val df = spark
>   .read
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
>   .option("subscribePattern", "topic.*")
>   .option("startingOffsets", "earliest")
>   .option("endingOffsets", "latest")
>   .option("clientIdPrefix", "my-workflow-")
>   .load()
> ```
> Possible implement would be to update [InternalKafkaProducerPool](https://github.com/apache/spark/blob/master/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/producer/InternalKafkaProducerPool.scala#L75), or maybe in Spark `KafkaConfigUpdater` ?



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org