You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Olga Luganska <tr...@hotmail.com> on 2018/10/10 02:28:23 UTC

FlinkKafkaProducer and Confluent Schema Registry

Hello,

I would like to use Confluent Schema Registry in my streaming job.
I was able to make it work with the help of generic Kafka producer and FlinkKafkaConsumer which is using ConfluentRegistryAvroDeserializationSchema.


FlinkKafkaConsumer011<GenericRecord> consumer = new FlinkKafkaConsumer011<>(MY_TOPIC,

ConfluentRegistryAvroDeserializationSchema.forGeneric(schema, SCHEMA_URI), kafkaProperties);


My question: is it possible to implement producer logic in the FlinkKafkaProducer to serialize message and store schema id in the Confluent Schema registry?


I don't think this is going to work with the current interface because creation and caching of the schema id in the Confluent Schema Registry is done with the help of io.confluent.kafka.serializers.KafkaAvroSerializer.class  and all FlinkKafkaProducer constructors have either SerializationSchema or KeyedSerializationSchema (part of Flink's own serialization stack) as one of the parameters.
If my assumption is wrong, could you please provide details of implementation?
Thank you very much,
Olga





​





Re: FlinkKafkaProducer and Confluent Schema Registry

Posted by Dawid Wysakowicz <dw...@apache.org>.
Hi Olga,

The only thing I can tell is that it definitely won't make it to 1.7
release. The earliest possible is 1.8 then, which is scheduled for the
beginning of next year.

Best,

Dawid


On 08/11/2018 00:48, Olga Luganska wrote:
> Dawid,
>
> Is there a projected date to
> deliver ConfluentRegistryAvroSerializationSchema ?
>
> thank you,
> Olga
>
> ------------------------------------------------------------------------
> *From:* Dawid Wysakowicz <wy...@gmail.com>
> *Sent:* Monday, October 22, 2018 10:40 AM
> *To:* treble77@hotmail.com
> *Cc:* user
> *Subject:* Re: FlinkKafkaProducer and Confluent Schema Registry
>  
> Hi Olga,
> There is an open PR[1] that has some in-progress work on corresponding
> AvroSerializationSchema, you can have a look at it. The bigger issue
> there is that SerializationSchema does not have access to event's key
> so using topic pattern might be problematic.
> Best,
> Dawid
>
> [1] https://github.com/apache/flink/pull/6259
>
> On Mon, 22 Oct 2018 at 16:51, Kostas Kloudas
> <k.kloudas@data-artisans.com <ma...@data-artisans.com>> wrote:
>
>     Hi Olga,
>
>     Sorry for the late reply.
>     I think that Gordon (cc’ed) could be able to answer your question.
>
>     Cheers,
>     Kostas
>
>>     On Oct 13, 2018, at 3:10 PM, Olga Luganska <treble77@hotmail.com
>>     <ma...@hotmail.com>> wrote:
>>
>>     Any suggestions?
>>
>>     Thank you
>>
>>     Sent from my iPhone
>>
>>     On Oct 9, 2018, at 9:28 PM, Olga Luganska <treble77@hotmail.com
>>     <ma...@hotmail.com>> wrote:
>>
>>>     Hello,
>>>
>>>     Iwould like to useConfluent Schema Registry in my streaming job.
>>>     I was able to make it work with the help of
>>>     generic Kafka producer and FlinkKafkaConsumer which is using
>>>     ConfluentRegistryAvroDeserializationSchema.  
>>>
>>>     FlinkKafkaConsumer011<GenericRecord> consumer=
>>>     newFlinkKafkaConsumer011<>(MY_TOPIC,
>>>     ConfluentRegistryAvroDeserializationSchema.forGeneric(schema,
>>>     SCHEMA_URI), kafkaProperties);
>>>
>>>     My question: is it possible to implement *producer*logic in the
>>>     FlinkKafkaProducer to serialize message and store schema id in
>>>     the Confluent Schema registry?
>>>
>>>     I don't think this is going to work with the current interface
>>>     because creation and caching of the schema id in the Confluent
>>>     Schema Registry is done with the help
>>>     of /io.confluent.kafka.serializers.KafkaAvroSerializer.class/  and
>>>     all FlinkKafkaProducer constructors have either
>>>     SerializationSchema or KeyedSerializationSchema (part of Flink's
>>>     own serialization stack) as one of the parameters.
>>>
>>>
>>>         If my assumption is wrong, could you please provide details
>>>         of implementation?
>>>
>>>     Thank you very much,
>>>     Olga
>>>
>>>
>>>
>>>
>>>
>>>     ​
>>>
>>>
>>>
>>>
>

Re: FlinkKafkaProducer and Confluent Schema Registry

Posted by Olga Luganska <tr...@hotmail.com>.
Dawid,

Is there a projected date to deliver ConfluentRegistryAvroSerializationSchema ?

thank you,
Olga

________________________________
From: Dawid Wysakowicz <wy...@gmail.com>
Sent: Monday, October 22, 2018 10:40 AM
To: treble77@hotmail.com
Cc: user
Subject: Re: FlinkKafkaProducer and Confluent Schema Registry

Hi Olga,
There is an open PR[1] that has some in-progress work on corresponding AvroSerializationSchema, you can have a look at it. The bigger issue there is that SerializationSchema does not have access to event's key so using topic pattern might be problematic.
Best,
Dawid

[1] https://github.com/apache/flink/pull/6259

On Mon, 22 Oct 2018 at 16:51, Kostas Kloudas <k....@data-artisans.com>> wrote:
Hi Olga,

Sorry for the late reply.
I think that Gordon (cc’ed) could be able to answer your question.

Cheers,
Kostas

On Oct 13, 2018, at 3:10 PM, Olga Luganska <tr...@hotmail.com>> wrote:

Any suggestions?

Thank you

Sent from my iPhone

On Oct 9, 2018, at 9:28 PM, Olga Luganska <tr...@hotmail.com>> wrote:

Hello,

I would like to use Confluent Schema Registry in my streaming job.
I was able to make it work with the help of generic Kafka producer and FlinkKafkaConsumer which is using ConfluentRegistryAvroDeserializationSchema.

FlinkKafkaConsumer011<GenericRecord> consumer = new FlinkKafkaConsumer011<>(MY_TOPIC,
ConfluentRegistryAvroDeserializationSchema.forGeneric(schema, SCHEMA_URI), kafkaProperties);

My question: is it possible to implement producer logic in the FlinkKafkaProducer to serialize message and store schema id in the Confluent Schema registry?

I don't think this is going to work with the current interface because creation and caching of the schema id in the Confluent Schema Registry is done with the help of io.confluent.kafka.serializers.KafkaAvroSerializer.class  and all FlinkKafkaProducer constructors have either SerializationSchema or KeyedSerializationSchema (part of Flink's own serialization stack) as one of the parameters.
If my assumption is wrong, could you please provide details of implementation?
Thank you very much,
Olga





​






Re: FlinkKafkaProducer and Confluent Schema Registry

Posted by Dawid Wysakowicz <wy...@gmail.com>.
Hi Olga,
There is an open PR[1] that has some in-progress work on corresponding
AvroSerializationSchema, you can have a look at it. The bigger issue there
is that SerializationSchema does not have access to event's key so using
topic pattern might be problematic.
Best,
Dawid

[1] https://github.com/apache/flink/pull/6259

On Mon, 22 Oct 2018 at 16:51, Kostas Kloudas <k....@data-artisans.com>
wrote:

> Hi Olga,
>
> Sorry for the late reply.
> I think that Gordon (cc’ed) could be able to answer your question.
>
> Cheers,
> Kostas
>
> On Oct 13, 2018, at 3:10 PM, Olga Luganska <tr...@hotmail.com> wrote:
>
> Any suggestions?
>
> Thank you
>
> Sent from my iPhone
>
> On Oct 9, 2018, at 9:28 PM, Olga Luganska <tr...@hotmail.com> wrote:
>
> Hello,
>
> I would like to use Confluent Schema Registry in my streaming job.
> I was able to make it work with the help of generic Kafka producer and
> FlinkKafkaConsumer which is using
> ConfluentRegistryAvroDeserializationSchema.
>
> FlinkKafkaConsumer011<GenericRecord> consumer = new
> FlinkKafkaConsumer011<>(MY_TOPIC,
> ConfluentRegistryAvroDeserializationSchema.forGeneric(schema, SCHEMA_URI),
> kafkaProperties);
>
> My question: is it possible to implement *producer* logic in the
> FlinkKafkaProducer to serialize message and store schema id in the
> Confluent Schema registry?
>
> I don't think this is going to work with the current interface because
> creation and caching of the schema id in the Confluent Schema Registry is
> done with the help of
> *io.confluent.kafka.serializers.KafkaAvroSerializer.class*  and all
> FlinkKafkaProducer constructors have either SerializationSchema or
> KeyedSerializationSchema (part of Flink's own serialization stack) as one
> of the parameters.
> If my assumption is wrong, could you please provide details of
> implementation?
> Thank you very much,
> Olga
>
>
>
>
>
> ​
>
>
>
>
>
>

Re: FlinkKafkaProducer and Confluent Schema Registry

Posted by Kostas Kloudas <k....@data-artisans.com>.
Hi Olga,

Sorry for the late reply.
I think that Gordon (cc’ed) could be able to answer your question.

Cheers,
Kostas

> On Oct 13, 2018, at 3:10 PM, Olga Luganska <tr...@hotmail.com> wrote:
> 
> Any suggestions?
> 
> Thank you
> 
> Sent from my iPhone
> 
> On Oct 9, 2018, at 9:28 PM, Olga Luganska <treble77@hotmail.com <ma...@hotmail.com>> wrote:
> 
>> Hello,
>> 
>> I would like to use Confluent Schema Registry in my streaming job.
>> I was able to make it work with the help of generic Kafka producer and FlinkKafkaConsumer which is using ConfluentRegistryAvroDeserializationSchema.  
>> 
>> FlinkKafkaConsumer011<GenericRecord> consumer = new FlinkKafkaConsumer011<>(MY_TOPIC,
>> ConfluentRegistryAvroDeserializationSchema.forGeneric(schema, SCHEMA_URI), kafkaProperties);
>> 
>> My question: is it possible to implement producer logic in the FlinkKafkaProducer to serialize message and store schema id in the Confluent Schema registry?
>> 
>> I don't think this is going to work with the current interface because creation and caching of the schema id in the Confluent Schema Registry is done with the help of io.confluent.kafka.serializers.KafkaAvroSerializer.class  and all FlinkKafkaProducer constructors have either SerializationSchema or KeyedSerializationSchema (part of Flink's own serialization stack) as one of the parameters.
>> If my assumption is wrong, could you please provide details of implementation?
>> 
>> Thank you very much,
>> Olga
>> 
>> 
>> 
>> 
>> 
>> ​
>> 
>> 
>> 
>> 


Re: FlinkKafkaProducer and Confluent Schema Registry

Posted by Olga Luganska <tr...@hotmail.com>.
Any suggestions?

Thank you

Sent from my iPhone

On Oct 9, 2018, at 9:28 PM, Olga Luganska <tr...@hotmail.com>> wrote:

Hello,

I would like to use Confluent Schema Registry in my streaming job.
I was able to make it work with the help of generic Kafka producer and FlinkKafkaConsumer which is using ConfluentRegistryAvroDeserializationSchema.


FlinkKafkaConsumer011<GenericRecord> consumer = new FlinkKafkaConsumer011<>(MY_TOPIC,

ConfluentRegistryAvroDeserializationSchema.forGeneric(schema, SCHEMA_URI), kafkaProperties);


My question: is it possible to implement producer logic in the FlinkKafkaProducer to serialize message and store schema id in the Confluent Schema registry?


I don't think this is going to work with the current interface because creation and caching of the schema id in the Confluent Schema Registry is done with the help of io.confluent.kafka.serializers.KafkaAvroSerializer.class  and all FlinkKafkaProducer constructors have either SerializationSchema or KeyedSerializationSchema (part of Flink's own serialization stack) as one of the parameters.
If my assumption is wrong, could you please provide details of implementation?
Thank you very much,
Olga





​