You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Jorg Heymans <jo...@gmail.com> on 2019/11/11 10:58:03 UTC

kafka-console-consumer --value-deserializer with access to headers

Hi,

I have created a class implementing Deserializer, providing an implementation for 

public String deserialize(String topic, Headers headers, byte[] data) 

that does some conditional processing based on headers, and then calls the other serde method

public String deserialize(String topic, byte[] data)

What i'm seeing is that kafka-console-consumer only uses the second method when a value deserializer is specified. Is there a way to force it to invoke the first method, so i can do processing with headers ? I tried implementing the deprecated 'ExtendedSerializer' but it does not make a difference.

Thanks,
Jorg

Re: kafka-console-consumer --value-deserializer with access to headers

Posted by Jorg Heymans <jo...@gmail.com>.
Issue created here: https://issues.apache.org/jira/projects/KAFKA/issues/KAFKA-9202

On 2019/11/15 15:54:51, Jorg Heymans <jo...@gmail.com> wrote: 
> Debugging into ConsoleConsumer.scala it eventually just calls this:
> 
> val convertedBytes = deserializer.map(_.deserialize(topic, nonNullBytes).toString.
>         getBytes(StandardCharsets.UTF_8)).getOrElse(nonNullBytes)
> 
> See
> https://github.com/apache/kafka/blob/33d06082117d971cdcddd4f01392006b543f3c01/core/src/main/scala/kafka/tools/ConsoleConsumer.scala#L514
> 
> So the deserializer method containing headers will never be called in the case of ConsoleConsumer. I will log an issue on this.
> 
> Thanks for helping to debug this !
> 
> Jorg
> 
> On 2019/11/12 12:57:21, "M. Manna" <ma...@gmail.com> wrote: 
> > 
> > Recrord feching (deserialization call) happens using Fetcher. And Fetcher
> > is calling default implementation of Deserializer.deserialize() with
> > header. The default implementation returns the implementation of
> > deserialize() with header. If you provide overridden version of
> > deserializer (for both header/non-header) it will be called.
> > 
> > https://github.com/apache/kafka/blob/4e5b86419982050217d06b3c30ba6236e1fd9090/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1265
> > 
> > https://github.com/apache/kafka/blob/4e5b86419982050217d06b3c30ba6236e1fd9090/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1268
> > 
> > Console consumer simply puts a consumer wrapper around KafkaConsumer. There
> > is no change in behaviour otherwise. I take it that you've debugged and
> > confirmed that it's not calling your overridden deserialize() with headers?
> > If so, can you link it here for everyone's benefit?
> > 
> 
> 

Re: kafka-console-consumer --value-deserializer with access to headers

Posted by Jorg Heymans <jo...@gmail.com>.
Debugging into ConsoleConsumer.scala it eventually just calls this:

val convertedBytes = deserializer.map(_.deserialize(topic, nonNullBytes).toString.
        getBytes(StandardCharsets.UTF_8)).getOrElse(nonNullBytes)

See
https://github.com/apache/kafka/blob/33d06082117d971cdcddd4f01392006b543f3c01/core/src/main/scala/kafka/tools/ConsoleConsumer.scala#L514

So the deserializer method containing headers will never be called in the case of ConsoleConsumer. I will log an issue on this.

Thanks for helping to debug this !

Jorg

On 2019/11/12 12:57:21, "M. Manna" <ma...@gmail.com> wrote: 
> 
> Recrord feching (deserialization call) happens using Fetcher. And Fetcher
> is calling default implementation of Deserializer.deserialize() with
> header. The default implementation returns the implementation of
> deserialize() with header. If you provide overridden version of
> deserializer (for both header/non-header) it will be called.
> 
> https://github.com/apache/kafka/blob/4e5b86419982050217d06b3c30ba6236e1fd9090/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1265
> 
> https://github.com/apache/kafka/blob/4e5b86419982050217d06b3c30ba6236e1fd9090/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1268
> 
> Console consumer simply puts a consumer wrapper around KafkaConsumer. There
> is no change in behaviour otherwise. I take it that you've debugged and
> confirmed that it's not calling your overridden deserialize() with headers?
> If so, can you link it here for everyone's benefit?
> 


Re: kafka-console-consumer --value-deserializer with access to headers

Posted by Jorg Heymans <jo...@gmail.com>.
Hi,

Sorry for the late reply, here is the command:

kafka-console-consumer.cmd --consumer.config my-config.properties --bootstrap-server kafka-host:9443 --topic my-topic --value-deserializer my.BasicDeserializer --group my-console-group --property print.key=true

my-config.properties only contains SSL related config.

Jorg

On 2019/11/12 15:06:02, "M. Manna" <ma...@gmail.com> wrote: 
> HI,
> 
> On Tue, 12 Nov 2019 at 14:37, Jorg Heymans <jo...@gmail.com> wrote:
> 
> > Thanks for helping debugging this. You can reproduce the issue using below
> > deserializer, and invoking kafka-console-consumer with
> > --value-deserializer=my.BasicDeserializer . As you will see, when the
> > consumer starts receiving messages only "SERDE WITHOUT HEADERS" is printed
> > to the console.
> >
> > Thanks,
> > Jorg
> >
> In the above, what command have you put exactly from command prompt ? can
> you share this with us?
> 
> Thanks,
> 
> >
> > public class BasicDeserializer implements Deserializer<String> {
> >
> >     @Override
> >     public void configure(Map<String, ?> configs, boolean isKey) {
> >         System.out.println("CONFIGURE");
> >     }
> >
> >     @Override
> >     public String deserialize(String topic, byte[] data) {
> >         System.out.println("SERDE WITHOUT HEADERS");
> >         return new String(data);
> >     }
> >
> >     @Override
> >     public String deserialize(String topic, Headers headers, byte[] data) {
> >         System.out.println("SERDE WITH HEADERS");
> >         return new String(data);
> >     }
> >
> >     @Override
> >     public void close() {
> >         System.out.println("CLOSE");
> >     }
> > }
> >
> >
> >
> >
> > On 2019/11/12 12:57:21, "M. Manna" <ma...@gmail.com> wrote:
> > > HI again,
> > >
> > > On Tue, 12 Nov 2019 at 12:31, Jorg Heymans <jo...@gmail.com>
> > wrote:
> > >
> > > > Hi,
> > > >
> > > > The issue is not that i cannot get a custom deserializer working, it's
> > > > that the custom deserializer i provide implements the default method
> > from
> > > > the Deserializer interface
> > > >
> > https://github.com/apache/kafka/blob/6f0008643db6e7299658442784f1bcc6c96395ed/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L59
> > > > that gives access to record Headers.
> > > >
> > > > The kafka console consumer never calls this method, it will only call
> > the
> > > > variant without Headers
> > > >
> > https://github.com/apache/kafka/blob/6f0008643db6e7299658442784f1bcc6c96395ed/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L50
> > > >
> > > > I'm using kafka 2.3.0 btw.
> > > >
> > > > Jorg
> > > >
> > >
> > > Recrord feching (deserialization call) happens using Fetcher. And Fetcher
> > > is calling default implementation of Deserializer.deserialize() with
> > > header. The default implementation returns the implementation of
> > > deserialize() with header. If you provide overridden version of
> > > deserializer (for both header/non-header) it will be called.
> > >
> > >
> > https://github.com/apache/kafka/blob/4e5b86419982050217d06b3c30ba6236e1fd9090/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1265
> > >
> > >
> > https://github.com/apache/kafka/blob/4e5b86419982050217d06b3c30ba6236e1fd9090/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1268
> > >
> > > Console consumer simply puts a consumer wrapper around KafkaConsumer.
> > There
> > > is no change in behaviour otherwise. I take it that you've debugged and
> > > confirmed that it's not calling your overridden deserialize() with
> > headers?
> > > If so, can you link it here for everyone's benefit?
> > >
> > > Thanks,
> > >
> > >
> > >
> > >
> > >
> > > > On 2019/11/12 11:58:26, "M. Manna" <ma...@gmail.com> wrote:
> > > > >
> > > > > I think you can try the following to get your implementation working
> > > > >
> > > > > 1) Provide the SerDe classes into classpath
> > > > > 2) Provide your consumer config file
> > > > > 3) Provide key/value Deserializer props via --consumer-property arg.
> > > > >
> > > >
> > > >
> > >
> >
> 

Re: kafka-console-consumer --value-deserializer with access to headers

Posted by "M. Manna" <ma...@gmail.com>.
HI,

On Tue, 12 Nov 2019 at 14:37, Jorg Heymans <jo...@gmail.com> wrote:

> Thanks for helping debugging this. You can reproduce the issue using below
> deserializer, and invoking kafka-console-consumer with
> --value-deserializer=my.BasicDeserializer . As you will see, when the
> consumer starts receiving messages only "SERDE WITHOUT HEADERS" is printed
> to the console.
>
> Thanks,
> Jorg
>
In the above, what command have you put exactly from command prompt ? can
you share this with us?

Thanks,

>
> public class BasicDeserializer implements Deserializer<String> {
>
>     @Override
>     public void configure(Map<String, ?> configs, boolean isKey) {
>         System.out.println("CONFIGURE");
>     }
>
>     @Override
>     public String deserialize(String topic, byte[] data) {
>         System.out.println("SERDE WITHOUT HEADERS");
>         return new String(data);
>     }
>
>     @Override
>     public String deserialize(String topic, Headers headers, byte[] data) {
>         System.out.println("SERDE WITH HEADERS");
>         return new String(data);
>     }
>
>     @Override
>     public void close() {
>         System.out.println("CLOSE");
>     }
> }
>
>
>
>
> On 2019/11/12 12:57:21, "M. Manna" <ma...@gmail.com> wrote:
> > HI again,
> >
> > On Tue, 12 Nov 2019 at 12:31, Jorg Heymans <jo...@gmail.com>
> wrote:
> >
> > > Hi,
> > >
> > > The issue is not that i cannot get a custom deserializer working, it's
> > > that the custom deserializer i provide implements the default method
> from
> > > the Deserializer interface
> > >
> https://github.com/apache/kafka/blob/6f0008643db6e7299658442784f1bcc6c96395ed/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L59
> > > that gives access to record Headers.
> > >
> > > The kafka console consumer never calls this method, it will only call
> the
> > > variant without Headers
> > >
> https://github.com/apache/kafka/blob/6f0008643db6e7299658442784f1bcc6c96395ed/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L50
> > >
> > > I'm using kafka 2.3.0 btw.
> > >
> > > Jorg
> > >
> >
> > Recrord feching (deserialization call) happens using Fetcher. And Fetcher
> > is calling default implementation of Deserializer.deserialize() with
> > header. The default implementation returns the implementation of
> > deserialize() with header. If you provide overridden version of
> > deserializer (for both header/non-header) it will be called.
> >
> >
> https://github.com/apache/kafka/blob/4e5b86419982050217d06b3c30ba6236e1fd9090/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1265
> >
> >
> https://github.com/apache/kafka/blob/4e5b86419982050217d06b3c30ba6236e1fd9090/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1268
> >
> > Console consumer simply puts a consumer wrapper around KafkaConsumer.
> There
> > is no change in behaviour otherwise. I take it that you've debugged and
> > confirmed that it's not calling your overridden deserialize() with
> headers?
> > If so, can you link it here for everyone's benefit?
> >
> > Thanks,
> >
> >
> >
> >
> >
> > > On 2019/11/12 11:58:26, "M. Manna" <ma...@gmail.com> wrote:
> > > >
> > > > I think you can try the following to get your implementation working
> > > >
> > > > 1) Provide the SerDe classes into classpath
> > > > 2) Provide your consumer config file
> > > > 3) Provide key/value Deserializer props via --consumer-property arg.
> > > >
> > >
> > >
> >
>

Re: kafka-console-consumer --value-deserializer with access to headers

Posted by Jorg Heymans <jo...@gmail.com>.
Thanks for helping debugging this. You can reproduce the issue using below deserializer, and invoking kafka-console-consumer with --value-deserializer=my.BasicDeserializer . As you will see, when the consumer starts receiving messages only "SERDE WITHOUT HEADERS" is printed to the console. 

Thanks,
Jorg

public class BasicDeserializer implements Deserializer<String> {

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        System.out.println("CONFIGURE");
    }

    @Override
    public String deserialize(String topic, byte[] data) {
        System.out.println("SERDE WITHOUT HEADERS");
        return new String(data);
    }

    @Override
    public String deserialize(String topic, Headers headers, byte[] data) {
        System.out.println("SERDE WITH HEADERS");
        return new String(data);
    }

    @Override
    public void close() {
        System.out.println("CLOSE");
    }
}




On 2019/11/12 12:57:21, "M. Manna" <ma...@gmail.com> wrote: 
> HI again,
> 
> On Tue, 12 Nov 2019 at 12:31, Jorg Heymans <jo...@gmail.com> wrote:
> 
> > Hi,
> >
> > The issue is not that i cannot get a custom deserializer working, it's
> > that the custom deserializer i provide implements the default method from
> > the Deserializer interface
> > https://github.com/apache/kafka/blob/6f0008643db6e7299658442784f1bcc6c96395ed/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L59
> > that gives access to record Headers.
> >
> > The kafka console consumer never calls this method, it will only call the
> > variant without Headers
> > https://github.com/apache/kafka/blob/6f0008643db6e7299658442784f1bcc6c96395ed/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L50
> >
> > I'm using kafka 2.3.0 btw.
> >
> > Jorg
> >
> 
> Recrord feching (deserialization call) happens using Fetcher. And Fetcher
> is calling default implementation of Deserializer.deserialize() with
> header. The default implementation returns the implementation of
> deserialize() with header. If you provide overridden version of
> deserializer (for both header/non-header) it will be called.
> 
> https://github.com/apache/kafka/blob/4e5b86419982050217d06b3c30ba6236e1fd9090/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1265
> 
> https://github.com/apache/kafka/blob/4e5b86419982050217d06b3c30ba6236e1fd9090/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1268
> 
> Console consumer simply puts a consumer wrapper around KafkaConsumer. There
> is no change in behaviour otherwise. I take it that you've debugged and
> confirmed that it's not calling your overridden deserialize() with headers?
> If so, can you link it here for everyone's benefit?
> 
> Thanks,
> 
> 
> 
> 
> 
> > On 2019/11/12 11:58:26, "M. Manna" <ma...@gmail.com> wrote:
> > >
> > > I think you can try the following to get your implementation working
> > >
> > > 1) Provide the SerDe classes into classpath
> > > 2) Provide your consumer config file
> > > 3) Provide key/value Deserializer props via --consumer-property arg.
> > >
> >
> >
> 

Re: kafka-console-consumer --value-deserializer with access to headers

Posted by "M. Manna" <ma...@gmail.com>.
HI again,

On Tue, 12 Nov 2019 at 12:31, Jorg Heymans <jo...@gmail.com> wrote:

> Hi,
>
> The issue is not that i cannot get a custom deserializer working, it's
> that the custom deserializer i provide implements the default method from
> the Deserializer interface
> https://github.com/apache/kafka/blob/6f0008643db6e7299658442784f1bcc6c96395ed/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L59
> that gives access to record Headers.
>
> The kafka console consumer never calls this method, it will only call the
> variant without Headers
> https://github.com/apache/kafka/blob/6f0008643db6e7299658442784f1bcc6c96395ed/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L50
>
> I'm using kafka 2.3.0 btw.
>
> Jorg
>

Recrord feching (deserialization call) happens using Fetcher. And Fetcher
is calling default implementation of Deserializer.deserialize() with
header. The default implementation returns the implementation of
deserialize() with header. If you provide overridden version of
deserializer (for both header/non-header) it will be called.

https://github.com/apache/kafka/blob/4e5b86419982050217d06b3c30ba6236e1fd9090/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1265

https://github.com/apache/kafka/blob/4e5b86419982050217d06b3c30ba6236e1fd9090/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1268

Console consumer simply puts a consumer wrapper around KafkaConsumer. There
is no change in behaviour otherwise. I take it that you've debugged and
confirmed that it's not calling your overridden deserialize() with headers?
If so, can you link it here for everyone's benefit?

Thanks,





> On 2019/11/12 11:58:26, "M. Manna" <ma...@gmail.com> wrote:
> >
> > I think you can try the following to get your implementation working
> >
> > 1) Provide the SerDe classes into classpath
> > 2) Provide your consumer config file
> > 3) Provide key/value Deserializer props via --consumer-property arg.
> >
>
>

Re: kafka-console-consumer --value-deserializer with access to headers

Posted by Jorg Heymans <jo...@gmail.com>.
Hi,

The issue is not that i cannot get a custom deserializer working, it's that the custom deserializer i provide implements the default method from the Deserializer interface https://github.com/apache/kafka/blob/6f0008643db6e7299658442784f1bcc6c96395ed/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L59 that gives access to record Headers.

The kafka console consumer never calls this method, it will only call the variant without Headers https://github.com/apache/kafka/blob/6f0008643db6e7299658442784f1bcc6c96395ed/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L50 

I'm using kafka 2.3.0 btw.

Jorg

On 2019/11/12 11:58:26, "M. Manna" <ma...@gmail.com> wrote: 
> 
> I think you can try the following to get your implementation working
> 
> 1) Provide the SerDe classes into classpath
> 2) Provide your consumer config file
> 3) Provide key/value Deserializer props via --consumer-property arg.
> 


Re: kafka-console-consumer --value-deserializer with access to headers

Posted by "M. Manna" <ma...@gmail.com>.
Hi

On Tue, 12 Nov 2019 at 09:53, Jorg Heymans <jo...@gmail.com> wrote:

> Indeed, i corrected the typo but now my deserializer class is not taken
> into account at all and it goes back to the default deserializer. You can
> verify this by putting a non-existent class and it still runs fine.
>
> value.deserializer=does.not.exist
>
> In ConsoleConsumer, the bootstrap.server, key/value deserializer are being
enforced via --consumer-property arg. It's aggregating all properties
between --consumer-property and --consumer.config. It'll prioritise kv pair
supplied via --consumer-property over the prop file.

I think you can try the following to get your implementation working

1) Provide the SerDe classes into classpath
2) Provide your consumer config file
3) Provide key/value Deserializer props via --consumer-property arg.

See how that works for you.

Thanks,

> Jorg
>
> On 2019/11/11 14:31:49, "M. Manna" <ma...@gmail.com> wrote:
> > You have a typo - you mean deserializer
> >
> > Please try again.
> >
> > Regards,
> >
> > On Mon, 11 Nov 2019 at 14:28, Jorg Heymans <jo...@gmail.com>
> wrote:
> >
> > > Don't think that option is available there, specifying
> > > 'value.deserializer' in my consumer-config.properties file gives
> > >
> > > [2019-11-11 15:16:26,589] WARN The configuration 'value.serializer' was
> > > supplied but isn't a known config.
> > > (org.apache.kafka.clients.consumer.ConsumerConfig)
> > >
> > > Does there exist a description of what properties the consumer-config
> > > properties file accepts ? I could find only a few references to it in
> the
> > > documentation.
> > >
> > > Jorg
> > >
> > > On 2019/11/11 13:00:03, "M. Manna" <ma...@gmail.com> wrote:
> > > > Hi,
> > > >
> > > >
> > > > On Mon, 11 Nov 2019 at 10:58, Jorg Heymans <jo...@gmail.com>
> > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > I have created a class implementing Deserializer, providing an
> > > > > implementation for
> > > > >
> > > > > public String deserialize(String topic, Headers headers, byte[]
> data)
> > > > >
> > > > > that does some conditional processing based on headers, and then
> calls
> > > the
> > > > > other serde method
> > > > >
> > > > > public String deserialize(String topic, byte[] data)
> > > > >
> > > > > What i'm seeing is that kafka-console-consumer only uses the second
> > > method
> > > > > when a value deserializer is specified. Is there a way to force it
> to
> > > > > invoke the first method, so i can do processing with headers ? I
> tried
> > > > > implementing the deprecated 'ExtendedSerializer' but it does not
> make a
> > > > > difference.
> > > > >
> > > > > Thanks,
> > > > > Jorg
> > > > >
> > > >
> > > > Have you tried providing a separate prop file using consumer.config
> > > > argument? Please see the reference here:
> > > >
> > > > --consumer.config <String: config file>  Consumer config properties
> file.
> > > > Note
> > > >                                            that [consumer-property]
> takes
> > > >                                            precedence over this
> config.
> > > >
> > > > Try that and see how it goes.
> > > >
> > > > Thanks,
> > > >
> > >
> >
>

Re: kafka-console-consumer --value-deserializer with access to headers

Posted by Jorg Heymans <jo...@gmail.com>.
Indeed, i corrected the typo but now my deserializer class is not taken into account at all and it goes back to the default deserializer. You can verify this by putting a non-existent class and it still runs fine.

value.deserializer=does.not.exist

Jorg

On 2019/11/11 14:31:49, "M. Manna" <ma...@gmail.com> wrote: 
> You have a typo - you mean deserializer
> 
> Please try again.
> 
> Regards,
> 
> On Mon, 11 Nov 2019 at 14:28, Jorg Heymans <jo...@gmail.com> wrote:
> 
> > Don't think that option is available there, specifying
> > 'value.deserializer' in my consumer-config.properties file gives
> >
> > [2019-11-11 15:16:26,589] WARN The configuration 'value.serializer' was
> > supplied but isn't a known config.
> > (org.apache.kafka.clients.consumer.ConsumerConfig)
> >
> > Does there exist a description of what properties the consumer-config
> > properties file accepts ? I could find only a few references to it in the
> > documentation.
> >
> > Jorg
> >
> > On 2019/11/11 13:00:03, "M. Manna" <ma...@gmail.com> wrote:
> > > Hi,
> > >
> > >
> > > On Mon, 11 Nov 2019 at 10:58, Jorg Heymans <jo...@gmail.com>
> > wrote:
> > >
> > > > Hi,
> > > >
> > > > I have created a class implementing Deserializer, providing an
> > > > implementation for
> > > >
> > > > public String deserialize(String topic, Headers headers, byte[] data)
> > > >
> > > > that does some conditional processing based on headers, and then calls
> > the
> > > > other serde method
> > > >
> > > > public String deserialize(String topic, byte[] data)
> > > >
> > > > What i'm seeing is that kafka-console-consumer only uses the second
> > method
> > > > when a value deserializer is specified. Is there a way to force it to
> > > > invoke the first method, so i can do processing with headers ? I tried
> > > > implementing the deprecated 'ExtendedSerializer' but it does not make a
> > > > difference.
> > > >
> > > > Thanks,
> > > > Jorg
> > > >
> > >
> > > Have you tried providing a separate prop file using consumer.config
> > > argument? Please see the reference here:
> > >
> > > --consumer.config <String: config file>  Consumer config properties file.
> > > Note
> > >                                            that [consumer-property] takes
> > >                                            precedence over this config.
> > >
> > > Try that and see how it goes.
> > >
> > > Thanks,
> > >
> >
> 

Re: kafka-console-consumer --value-deserializer with access to headers

Posted by "M. Manna" <ma...@gmail.com>.
You have a typo - you mean deserializer

Please try again.

Regards,

On Mon, 11 Nov 2019 at 14:28, Jorg Heymans <jo...@gmail.com> wrote:

> Don't think that option is available there, specifying
> 'value.deserializer' in my consumer-config.properties file gives
>
> [2019-11-11 15:16:26,589] WARN The configuration 'value.serializer' was
> supplied but isn't a known config.
> (org.apache.kafka.clients.consumer.ConsumerConfig)
>
> Does there exist a description of what properties the consumer-config
> properties file accepts ? I could find only a few references to it in the
> documentation.
>
> Jorg
>
> On 2019/11/11 13:00:03, "M. Manna" <ma...@gmail.com> wrote:
> > Hi,
> >
> >
> > On Mon, 11 Nov 2019 at 10:58, Jorg Heymans <jo...@gmail.com>
> wrote:
> >
> > > Hi,
> > >
> > > I have created a class implementing Deserializer, providing an
> > > implementation for
> > >
> > > public String deserialize(String topic, Headers headers, byte[] data)
> > >
> > > that does some conditional processing based on headers, and then calls
> the
> > > other serde method
> > >
> > > public String deserialize(String topic, byte[] data)
> > >
> > > What i'm seeing is that kafka-console-consumer only uses the second
> method
> > > when a value deserializer is specified. Is there a way to force it to
> > > invoke the first method, so i can do processing with headers ? I tried
> > > implementing the deprecated 'ExtendedSerializer' but it does not make a
> > > difference.
> > >
> > > Thanks,
> > > Jorg
> > >
> >
> > Have you tried providing a separate prop file using consumer.config
> > argument? Please see the reference here:
> >
> > --consumer.config <String: config file>  Consumer config properties file.
> > Note
> >                                            that [consumer-property] takes
> >                                            precedence over this config.
> >
> > Try that and see how it goes.
> >
> > Thanks,
> >
>

Re: kafka-console-consumer --value-deserializer with access to headers

Posted by Jorg Heymans <jo...@gmail.com>.
Don't think that option is available there, specifying 'value.deserializer' in my consumer-config.properties file gives 

[2019-11-11 15:16:26,589] WARN The configuration 'value.serializer' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)

Does there exist a description of what properties the consumer-config properties file accepts ? I could find only a few references to it in the documentation.

Jorg

On 2019/11/11 13:00:03, "M. Manna" <ma...@gmail.com> wrote: 
> Hi,
> 
> 
> On Mon, 11 Nov 2019 at 10:58, Jorg Heymans <jo...@gmail.com> wrote:
> 
> > Hi,
> >
> > I have created a class implementing Deserializer, providing an
> > implementation for
> >
> > public String deserialize(String topic, Headers headers, byte[] data)
> >
> > that does some conditional processing based on headers, and then calls the
> > other serde method
> >
> > public String deserialize(String topic, byte[] data)
> >
> > What i'm seeing is that kafka-console-consumer only uses the second method
> > when a value deserializer is specified. Is there a way to force it to
> > invoke the first method, so i can do processing with headers ? I tried
> > implementing the deprecated 'ExtendedSerializer' but it does not make a
> > difference.
> >
> > Thanks,
> > Jorg
> >
> 
> Have you tried providing a separate prop file using consumer.config
> argument? Please see the reference here:
> 
> --consumer.config <String: config file>  Consumer config properties file.
> Note
>                                            that [consumer-property] takes
>                                            precedence over this config.
> 
> Try that and see how it goes.
> 
> Thanks,
> 

Re: kafka-console-consumer --value-deserializer with access to headers

Posted by "M. Manna" <ma...@gmail.com>.
Hi,


On Mon, 11 Nov 2019 at 10:58, Jorg Heymans <jo...@gmail.com> wrote:

> Hi,
>
> I have created a class implementing Deserializer, providing an
> implementation for
>
> public String deserialize(String topic, Headers headers, byte[] data)
>
> that does some conditional processing based on headers, and then calls the
> other serde method
>
> public String deserialize(String topic, byte[] data)
>
> What i'm seeing is that kafka-console-consumer only uses the second method
> when a value deserializer is specified. Is there a way to force it to
> invoke the first method, so i can do processing with headers ? I tried
> implementing the deprecated 'ExtendedSerializer' but it does not make a
> difference.
>
> Thanks,
> Jorg
>

Have you tried providing a separate prop file using consumer.config
argument? Please see the reference here:

--consumer.config <String: config file>  Consumer config properties file.
Note
                                           that [consumer-property] takes
                                           precedence over this config.

Try that and see how it goes.

Thanks,