You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Gunnar Morling (Jira)" <ji...@apache.org> on 2019/10/10 09:41:00 UTC

[jira] [Commented] (KAFKA-7273) Converters should have access to headers.

    [ https://issues.apache.org/jira/browse/KAFKA-7273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16948354#comment-16948354 ] 

Gunnar Morling commented on KAFKA-7273:
---------------------------------------

Hey [~rhauch], [~jcustenborder], just came across this change, which I'd have suggested myself otherwise :-) Very nice for a specific use case I have in mind.

One question for clarification, though: is the {{Headers}} object passed to {{fromConnectData()}} modifiable, i.e. can a converter add/override existing headers for before the message is sent to Kafka? This is what I'd like to do, and it's not quite clear to me whether that's allowed or not. Modifying the input parameter as a side-effect might be a bit surprising given the method returns something.

If it is supported, I'll be happy to send a PR with a clarifying sentence in the method JavaDoc. Thanks!

> Converters should have access to headers.
> -----------------------------------------
>
>                 Key: KAFKA-7273
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7273
>             Project: Kafka
>          Issue Type: Improvement
>          Components: KafkaConnect
>            Reporter: Jeremy Custenborder
>            Assignee: Jeremy Custenborder
>            Priority: Major
>             Fix For: 2.4.0
>
>
> I found myself wanting to build a converter that stored additional type information within headers. The converter interface does not allow a developer to access to the headers in a Converter. I'm not suggesting that we change the method for serializing them, rather that *org.apache.kafka.connect.header.Headers* be passed in for *fromConnectData* and *toConnectData*. For example something like this.
> {code:java}
> import org.apache.kafka.connect.data.Schema;
> import org.apache.kafka.connect.data.SchemaAndValue;
> import org.apache.kafka.connect.header.Headers;
> import org.apache.kafka.connect.storage.Converter;
> public interface Converter {
>   default byte[] fromConnectData(String topic, Headers headers, Schema schema, Object object) {
>     return fromConnectData(topic, schema, object);
>   }
>   default SchemaAndValue toConnectData(String topic, Headers headers, byte[] payload) {
>     return toConnectData(topic, payload);
>   }
>   void configure(Map<String, ?> var1, boolean var2);
>   byte[] fromConnectData(String var1, Schema var2, Object var3);
>   SchemaAndValue toConnectData(String var1, byte[] var2);
> }
> {code}
> This would be a similar approach to what was already done with ExtendedDeserializer and ExtendedSerializer in the Kafka client.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)