You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean R. Owen (Jira)" <ji...@apache.org> on 2019/10/25 13:08:00 UTC

[jira] [Resolved] (SPARK-29500) Support partition column when writing to Kafka

     [ https://issues.apache.org/jira/browse/SPARK-29500?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sean R. Owen resolved SPARK-29500.
----------------------------------
    Fix Version/s: 3.0.0
       Resolution: Fixed

Issue resolved by pull request 26153
[https://github.com/apache/spark/pull/26153]

> Support partition column when writing to Kafka
> ----------------------------------------------
>
>                 Key: SPARK-29500
>                 URL: https://issues.apache.org/jira/browse/SPARK-29500
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL, Structured Streaming
>    Affects Versions: 2.4.4, 3.0.0
>            Reporter: Nicola Bova
>            Assignee: Nicola Bova
>            Priority: Major
>              Labels: starter
>             Fix For: 3.0.0
>
>
> When writing to a Kafka topic, `KafkaWriter` does not support selecting the ouput kafka partition through a DataFrame column.
> While it is possible to configure a custom Kafka Partitioner with 
>  `.option({color:#6a8759}"kafka.partitioner.class"{color}{color:#cc7832}, {color}{color:#6a8759}"my.custom.Partitioner"{color})`, this is not enough for certain use cases.
> After the introduction of GDPR, it is a common pattern to emit records with unique Kafka keys, thus allowing to tombstone individual records.
> This strategy implies that the totality of the key information cannot be used to calculate the topic partition and users need to resort to custom partitioners.
> However, as stated at [https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#kafka-specific-configurations], "Keys/Values are always serialized with ByteArraySerializer or StringSerializer. Use DataFrame operations to explicitly serialize keys/values into either strings or byte arrays."
> Therefore, a custom partitioner would need to
>  - deserialize the key (or value)
>  - calculate the output partition using a subset of the key (or value) fields
> This is inefficient because it requires an unnecessary deserialization step. It also makes it impossible to use Spark batch writer to send Kafka tombstones when the partition is calculated from a subset of the kafka value.
> It would be a nice addition to let the user choose a partition by setting a value in the "partition" column of the dataframe, as already done for `topic`, `key`, `value`, and `headers` in `KafkaWriter`, also mirroring the `ProducerRecord` API.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org