You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Nicola Bova (Jira)" <ji...@apache.org> on 2019/10/17 11:07:00 UTC

[jira] [Updated] (SPARK-29500) Support partition column when writing to Kafka

     [ https://issues.apache.org/jira/browse/SPARK-29500?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Nicola Bova updated SPARK-29500:
--------------------------------
    Labels: starter  (was: )

> Support partition column when writing to Kafka
> ----------------------------------------------
>
>                 Key: SPARK-29500
>                 URL: https://issues.apache.org/jira/browse/SPARK-29500
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL, Structured Streaming
>    Affects Versions: 2.4.4, 3.0.0
>            Reporter: Nicola Bova
>            Priority: Major
>              Labels: starter
>
> When writing to a Kafka topic, `KafkaWriter` does not support selecting the ouput kafka partition through a DataFrame column.
> While it is possible to configure a custom Kafka Partitioner with 
> `.option({color:#6a8759}"kafka.partitioner.class"{color}{color:#cc7832}, {color}{color:#6a8759}"my.custom.Partitioner"{color})`, this is not enough for certain use cases. 
> After the introduction of GDPR, it is a common pattern to emit records with unique Kafka keys, thus allowing to tombstone individual records.
> This strategy implies that the totality of the key information cannot be used to calculate the topic partition and users need to resort to custom partitioners.
> However, as stated at [https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#kafka-specific-configurations], "Keys/Values are always serialized with ByteArraySerializer or StringSerializer. Use DataFrame operations to explicitly serialize keys/values into either strings or byte arrays."
> Therefore, a custom partitioner would need to 
> - deserialize the key (or value)
> - calculate the output partition using a subset of the key (or value) fields
> This is inefficient because it requires an unnecessary deserialization step. It also makes it impossible to use Spark batch writer to send Kafka tombstones when the partition is calculated from a subset of the kafka value.
> It would be a nice addition to let the user choose a partition by setting a value in the "partition" column of the dataframe, as already done for `topic`, `key`, `value`, and `headers` in `KafkaWriter`, also mirroring the full `ProducerRecord` API.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org