You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by lan tran <in...@gmail.com> on 2022/04/21 06:43:32 UTC

AvroRowDeserializationSchema

Hi team,  
  
I want to implement AvroRowDeserializationSchema when consume data from Kafka,
however from the documentation, I did not understand what are
avro_schema_string and record_class ? I would be great if you can give me the
example on this (I only have the example on Java, however, I was doing it
using PyFlink ).  
  
As my understanding avro_schema_string is schema_registry_url ? Does it
support this 'debezium-avro-confluent.schema-
registry.url'='{schema_registry_url}' like in TableAPI ?  
  
Best,  
Quynh.  
  

Sent from [Mail](https://go.microsoft.com/fwlink/?LinkId=550986) for Windows




Re: AvroRowDeserializationSchema

Posted by Dian Fu <di...@gmail.com>.
Oh, I just missed your last question, sorry for that. The offset is stored
in the checkpoint and it will recover the offset from the checkpoint when
the job failover.

Things which you may need to pay attention to:
1) Enable the checkpoint and configure it if necessary [1]
2) Specify the start up mode via `scan.startup.mode` for Kafka connector
which works when the job start from scratch when there is no offset
available for use
3) It will restore from the latest checkpoint when the job failovers.
However, when you manually suspend/start a job, then you need to specify
the checkpoint/savepoint manually. see [2][3][4] for more details.

Things done in Flink (so you don't need to care):
1) The offset checkpoint and restoring

Regards,
Dian

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/checkpointing/#enabling-and-configuring-checkpointing
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/cli/#creating-a-savepoint
[3]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/cli/#stopping-a-job-gracefully-creating-a-final-savepoint
[4]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/cli/#starting-a-job-from-a-savepoint



On Thu, Apr 28, 2022 at 5:45 PM lan tran <in...@gmail.com> wrote:

> Don’t expect that answer =))
> However, I am very appreciate everything you did
> Thanks again for helping me out.
>
> Best,
> Quynh.
>
>
>
> Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for
> Windows
>
>
>
> *From: *Dian Fu <di...@gmail.com>
> *Sent: *Thursday, April 28, 2022 2:59 PM
> *To: *lan tran <in...@gmail.com>
> *Cc: *user@flink.apache.org
> *Subject: *Re: AvroRowDeserializationSchema
>
>
>
> Yes, I think so~
>
>
>
> On Thu, Apr 28, 2022 at 11:00 AM lan tran <in...@gmail.com>
> wrote:
>
> Hi Dian,
>
> Sorry for missing your mail, so if I did as your suggestion and the Flink
> somehow crashed and we have to restart the service, does the Flink job know
> the offset where does it read from Kafka ?
>
>
>
> Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for
> Windows
>
>
>
> *From: *Dian Fu <di...@gmail.com>
> *Sent: *Tuesday, April 26, 2022 7:54 AM
> *To: *lan tran <in...@gmail.com>
> *Cc: *user@flink.apache.org
> *Subject: *Re: AvroRowDeserializationSchema
>
>
>
> Hi Quynh,
>
> The same code in my last reply showed how to set the UID for the source
> operator generated using Table API. I meant that you could firstly create a
> source using Table API, then convert it to a DataStream API and set uid for
> the source operator using the same code above, then perform operations with
> DataStream API.
>
> Regards,
> Dian
>
>
>
> On Mon, Apr 25, 2022 at 9:27 PM lan tran <in...@gmail.com> wrote:
>
> Hi Dian,
>
> Thank again for fast response.
>
> As your suggestion above, we can apply to set the UID for only for the
> DataStream state (as you suggest to convert from table to data stream).
>
> However, at the first phase which is collecting the data from Kafka (
> having Debezium format), the UID cannot be set since we are using Table API
> (auto generate the UID).
>
> Therefore, if there is some crashed or needed revert using SavePoint, we
> cannot use it in the first phase since we cannot set the UID for this => so
> how can we revert it ?.
>
> As a result of that, we want to use DebeziumAvroRowDeserializationSchema
> and DebeziumJsonRowDeserializationSchema in the DataStream job to be able
> to use the Savepoint for the whole full flow.
>
> Best,
> Quynh
>
>
>
> Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for
> Windows
>
>
>
> *From: *Dian Fu <di...@gmail.com>
> *Sent: *Monday, April 25, 2022 7:46 PM
> *To: *lan tran <in...@gmail.com>
> *Cc: *user@flink.apache.org
> *Subject: *Re: AvroRowDeserializationSchema
>
>
>
> Hi Quynh,
>
> You could try the following code (also it may be a little hacky):
> ```
>
> def set_uid_for_source(ds: DataStream, uid: str):
>
> transformation = ds._j_data_stream.getTransformation()
>
>
>
> source_transformation = transformation
>
> while not source_transformation.getInputs().isEmpty():
>
> source_transformation = source_transformation.getInputs().get(0)
>
>
>
> source_transformation.setUid(uid)
>
> ```
>
> Besides, could you describe your use case a bit and also how you want to
> use DebeziumAvroRowDeserializationSchema and
> DebeziumJsonRowDeserializationSchema in the DataStream job? Note that for
> the sources with these formats, it will send UPDATE messages to downstream
> operators.
>
> Regards
> Dian
>
>
>
> On Mon, Apr 25, 2022 at 12:31 PM lan tran <in...@gmail.com>
> wrote:
>
> Yeah, I already tried that way. However, if we did not use DataStream at
> first. We cannot implement the Savepoint since through the doc if we use
> TableAPI (SQL API), the uid is generated automatically which means we
> cannot revert if the system is crashed.
>
> Best,
> Quynh
>
>
>
> Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for
> Windows
>
>
>
> *From: *Dian Fu <di...@gmail.com>
> *Sent: *Monday, April 25, 2022 11:04 AM
> *To: *lan tran <in...@gmail.com>
> *Cc: *user@flink.apache.org
> *Subject: *Re: AvroRowDeserializationSchema
>
>
>
> DebeziumAvroRowDeserializationSchema and
> DebeziumJsonRowDeserializationSchema are still not supported in
> Python DataStream API.
>
> Just take a further look at the Java implementation of
> DebeziumAvroDeserializationSchema and DebeziumJsonDeserializationSchema,
> the results type is RowData instead of Row and so it should be not that
> easy to be directly supported in Python DataStream API. However, it
> supports conversion between Table API & DataStream API[1]. Could you
> firstly create a Table which consumes data from kafka and then convert it
> to a DataStream API?
>
> Regards,
> Dian
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/datastream/intro_to_datastream_api/#create-using-table--sql-connectors
>
>
>
> On Mon, Apr 25, 2022 at 11:48 AM Dian Fu <di...@gmail.com> wrote:
>
> Yes, we should support them.
>
> For now, if you want to use them, you could create ones in your own
> project. You could refer to AvroRowDeserializationSchema[1] as an example.
> It should not be complicated as it's simply a wrapper of the
> Java implementation.
>
> Regards,
> Dian
>
> [1]
> https://github.com/apache/flink/blob/e11a5c52c613e121f7a7868cbbfd9e7c21551394/flink-python/pyflink/common/serialization.py#L308
>
>
>
> On Mon, Apr 25, 2022 at 11:27 AM lan tran <in...@gmail.com>
> wrote:
>
> Thank Dian !! Very appreciate this.
>
> However, I have another questions related to this. In current version or
> any updating in future, does DataStream support
> DebeziumAvroRowDeserializationSchema and
> DebeziumJsonRowDeserializationSchema in PyFlink ? Since I look at the
> documentation and seem it is not supported yet.
>
> Best,
> Quynh
>
> Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for
> Windows
>
>
>
> *From: *Dian Fu <di...@gmail.com>
> *Sent: *Friday, April 22, 2022 9:36 PM
> *To: *lan tran <in...@gmail.com>
> *Cc: *user@flink.apache.org
> *Subject: *Re: AvroRowDeserializationSchema
>
>
>
> Hi Quynh,
>
> I have added an example on how to use AvroRowDeserializationSchema in
> Python DataStream API in [1]. Please take a look at if that helps for you~
>
> Regards,
> Dian
>
> [1]
> https://github.com/apache/flink/blob/release-1.15/flink-python/pyflink/examples/datastream/formats/avro_format.py
>
>
>
> On Fri, Apr 22, 2022 at 7:24 PM Dian Fu <di...@gmail.com> wrote:
>
> Hi Quynh,
>
> Could you show some sample code on how you use it?
>
> Regards,
> Dian
>
>
>
> On Fri, Apr 22, 2022 at 1:42 PM lan tran <in...@gmail.com> wrote:
>
> Wonder if this is a bug or not but if I use
> *AvroRowDeserializationSchema,*
>
> In PyFlink the error still occure ?
>
> py4j.protocol.Py4JError: An error occurred while calling
> None.org.apache.flink.formats.avro.AvroRowDeserializationSchema. Trace:
>
> org.apache.flink.api.python.shaded.py4j.Py4JException: Constructor
> org.apache.flink.formats.avro.AvroRowDeserializationSchema([class
> org.apache.avro.Schema$RecordSchema]) does not exist
>
> Therefore, please help check. Thanks
> Best,
> Quynh
>
>
>
>
>
> Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for
> Windows
>
>
>
> *From: *lan tran <in...@gmail.com>
> *Sent: *Thursday, April 21, 2022 1:43 PM
> *To: *user@flink.apache.org
> *Subject: *AvroRowDeserializationSchema
>
>
>
> Hi team,
>
> I want to implement AvroRowDeserializationSchema when consume data from
> Kafka, however from the documentation, I did not understand what are
> avro_schema_string and record_class ? I would be great if you can give me
> the example on this (I only have the example on Java, however, I was doing
> it using PyFlink ).
>
> As my understanding avro_schema_string is schema_registry_url ? Does it
> support this
>  'debezium-avro-confluent.schema-registry.url'='{schema_registry_url}' like
> in TableAPI ?
>
> Best,
> Quynh.
>
> Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for
> Windows
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>

RE: AvroRowDeserializationSchema

Posted by lan tran <in...@gmail.com>.
Don’t expect that answer =))  
However, I am very appreciate everything you did  
Thanks again for helping me out.  
  
Best,  
Quynh.



Sent from [Mail](https://go.microsoft.com/fwlink/?LinkId=550986) for Windows



 **From:**[Dian Fu](mailto:dian0511.fu@gmail.com)  
 **Sent:** Thursday, April 28, 2022 2:59 PM  
 **To:**[lan tran](mailto:indigoblue7075@gmail.com)  
 **Cc:**[user@flink.apache.org](mailto:user@flink.apache.org)  
 **Subject:** Re: AvroRowDeserializationSchema



Yes, I think so~



On Thu, Apr 28, 2022 at 11:00 AM lan tran
<[indigoblue7075@gmail.com](mailto:indigoblue7075@gmail.com)> wrote:

> Hi Dian,  
>  
> Sorry for missing your mail, so if I did as your suggestion and the Flink
> somehow crashed and we have to restart the service, does the Flink job know
> the offset where does it read from Kafka ?
>
>  
>
> Sent from [Mail](https://go.microsoft.com/fwlink/?LinkId=550986) for Windows
>
>  
>
>  **From:**[Dian Fu](mailto:dian0511.fu@gmail.com)  
>  **Sent:** Tuesday, April 26, 2022 7:54 AM  
>  **To:**[lan tran](mailto:indigoblue7075@gmail.com)  
>  **Cc:**[user@flink.apache.org](mailto:user@flink.apache.org)  
>  **Subject:** Re: AvroRowDeserializationSchema
>
>  
>
> Hi Quynh,  
>  
> The same code in my last reply showed how to set the UID for the source
> operator generated using Table API. I meant that you could firstly create a
> source using Table API, then convert it to a DataStream API and set uid for
> the source operator using the same code above, then perform operations with
> DataStream API.  
>  
> Regards,  
> Dian
>
>  
>
> On Mon, Apr 25, 2022 at 9:27 PM lan tran
> <[indigoblue7075@gmail.com](mailto:indigoblue7075@gmail.com)> wrote:
>

>> Hi Dian,  
>  
> Thank again for fast response.  
>  
> As your suggestion above, we can apply to set the UID for only for the
> DataStream state (as you suggest to convert from table to data stream).  
>  
> However, at the first phase which is collecting the data from Kafka ( having
> Debezium format), the UID cannot be set since we are using Table API (auto
> generate the UID).  
>  
> Therefore, if there is some crashed or needed revert using SavePoint, we
> cannot use it in the first phase since we cannot set the UID for this => so
> how can we revert it ?.  
>  
> As a result of that, we want to use DebeziumAvroRowDeserializationSchema and
> DebeziumJsonRowDeserializationSchema in the DataStream job to be able to use
> the Savepoint for the whole full flow.  
>  
> Best,  
> Quynh
>>

>>  
>>

>> Sent from [Mail](https://go.microsoft.com/fwlink/?LinkId=550986) for
Windows

>>

>>  
>>

>>  **From:**[Dian Fu](mailto:dian0511.fu@gmail.com)  
>  **Sent:** Monday, April 25, 2022 7:46 PM  
>  **To:**[lan tran](mailto:indigoblue7075@gmail.com)  
>  **Cc:**[user@flink.apache.org](mailto:user@flink.apache.org)  
>  **Subject:** Re: AvroRowDeserializationSchema
>>

>>  
>>

>> Hi Quynh,  
>  
> You could try the following code (also it may be a little hacky):  
> ```
>>

>> def set_uid_for_source(ds: DataStream, uid: str):

>>

>> transformation = ds._j_data_stream.getTransformation()

>>

>>  
>>

>> source_transformation = transformation

>>

>> while not source_transformation.getInputs().isEmpty():

>>

>> source_transformation = source_transformation.getInputs().get(0)

>>

>>  
>>

>> source_transformation.setUid(uid)

>>

>> ```  
>  
> Besides, could you describe your use case a bit and also how you want to use
> DebeziumAvroRowDeserializationSchema and
> DebeziumJsonRowDeserializationSchema in the DataStream job? Note that for
> the sources with these formats, it will send UPDATE messages to downstream
> operators.  
>  
> Regards  
> Dian
>>

>>  
>>

>> On Mon, Apr 25, 2022 at 12:31 PM lan tran
<[indigoblue7075@gmail.com](mailto:indigoblue7075@gmail.com)> wrote:

>>

>>> Yeah, I already tried that way. However, if we did not use DataStream at
first. We cannot implement the Savepoint since through the doc if we use
TableAPI (SQL API), the uid is generated automatically which means we cannot
revert if the system is crashed.  
>  
> Best,  
> Quynh
>>>

>>>  
>>>

>>> Sent from [Mail](https://go.microsoft.com/fwlink/?LinkId=550986) for
Windows

>>>

>>>  
>>>

>>>  **From:**[Dian Fu](mailto:dian0511.fu@gmail.com)  
>  **Sent:** Monday, April 25, 2022 11:04 AM  
>  **To:**[lan tran](mailto:indigoblue7075@gmail.com)  
>  **Cc:**[user@flink.apache.org](mailto:user@flink.apache.org)  
>  **Subject:** Re: AvroRowDeserializationSchema
>>>

>>>  
>>>

>>> DebeziumAvroRowDeserializationSchema and
DebeziumJsonRowDeserializationSchema are still not supported in Python
DataStream API.  
>  
> Just take a further look at the Java implementation of
> DebeziumAvroDeserializationSchema and DebeziumJsonDeserializationSchema, the
> results type is RowData instead of Row and so it should be not that easy to
> be directly supported in Python DataStream API. However, it supports
> conversion between Table API & DataStream API[1]. Could you firstly create a
> Table which consumes data from kafka and then convert it to a DataStream
> API?  
>  
> Regards,  
> Dian  
>  
> [1] <https://nightlies.apache.org/flink/flink-docs-
> release-1.14/docs/dev/python/datastream/intro_to_datastream_api/#create-
> using-table--sql-connectors>
>>>

>>>  
>>>

>>> On Mon, Apr 25, 2022 at 11:48 AM Dian Fu
<[dian0511.fu@gmail.com](mailto:dian0511.fu@gmail.com)> wrote:

>>>

>>>> Yes, we should support them.  
>  
> For now, if you want to use them, you could create ones in your own project.
> You could refer to AvroRowDeserializationSchema[1] as an example. It should
> not be complicated as it's simply a wrapper of the Java implementation.  
>  
> Regards,  
> Dian  
>  
> [1]
> <https://github.com/apache/flink/blob/e11a5c52c613e121f7a7868cbbfd9e7c21551394/flink-
> python/pyflink/common/serialization.py#L308>
>>>>

>>>>  
>>>>

>>>> On Mon, Apr 25, 2022 at 11:27 AM lan tran
<[indigoblue7075@gmail.com](mailto:indigoblue7075@gmail.com)> wrote:

>>>>

>>>>> Thank Dian !! Very appreciate this.  
>  
> However, I have another questions related to this. In current version or any
> updating in future, does DataStream support
> DebeziumAvroRowDeserializationSchema and
> DebeziumJsonRowDeserializationSchema in PyFlink ? Since I look at the
> documentation and seem it is not supported yet.  
>  
> Best,  
> Quynh
>>>>>

>>>>> Sent from [Mail](https://go.microsoft.com/fwlink/?LinkId=550986) for
Windows

>>>>>

>>>>>  
>>>>>

>>>>>  **From:**[Dian Fu](mailto:dian0511.fu@gmail.com)  
>  **Sent:** Friday, April 22, 2022 9:36 PM  
>  **To:**[lan tran](mailto:indigoblue7075@gmail.com)  
>  **Cc:**[user@flink.apache.org](mailto:user@flink.apache.org)  
>  **Subject:** Re: AvroRowDeserializationSchema
>>>>>

>>>>>  
>>>>>

>>>>> Hi Quynh,  
>  
> I have added an example on how to use AvroRowDeserializationSchema in Python
> DataStream API in [1]. Please take a look at if that helps for you~  
>  
> Regards,  
> Dian  
>  
> [1] <https://github.com/apache/flink/blob/release-1.15/flink-
> python/pyflink/examples/datastream/formats/avro_format.py>
>>>>>

>>>>>  
>>>>>

>>>>> On Fri, Apr 22, 2022 at 7:24 PM Dian Fu
<[dian0511.fu@gmail.com](mailto:dian0511.fu@gmail.com)> wrote:

>>>>>

>>>>>> Hi Quynh,  
>  
> Could you show some sample code on how you use it?  
>  
> Regards,  
> Dian
>>>>>>

>>>>>>  
>>>>>>

>>>>>> On Fri, Apr 22, 2022 at 1:42 PM lan tran
<[indigoblue7075@gmail.com](mailto:indigoblue7075@gmail.com)> wrote:

>>>>>>

>>>>>>> Wonder if this is a bug or not but if I use
**AvroRowDeserializationSchema,**

>>>>>>>

>>>>>>> In PyFlink the error still occure ?  
>  
> py4j.protocol.Py4JError: An error occurred while calling
> None.org.apache.flink.formats.avro.AvroRowDeserializationSchema. Trace:
>>>>>>>

>>>>>>> org.apache.flink.api.python.shaded.py4j.Py4JException: Constructor
org.apache.flink.formats.avro.AvroRowDeserializationSchema([class
org.apache.avro.Schema$RecordSchema]) does not exist  
>  
> Therefore, please help check. Thanks  
> Best,  
> Quynh
>>>>>>>

>>>>>>>  
>>>>>>>

>>>>>>>  
>>>>>>>

>>>>>>> Sent from [Mail](https://go.microsoft.com/fwlink/?LinkId=550986) for
Windows

>>>>>>>

>>>>>>>  
>>>>>>>

>>>>>>>  **From:**[lan tran](mailto:indigoblue7075@gmail.com)  
>  **Sent:** Thursday, April 21, 2022 1:43 PM  
>  **To:**[user@flink.apache.org](mailto:user@flink.apache.org)  
>  **Subject:** AvroRowDeserializationSchema
>>>>>>>

>>>>>>>  
>>>>>>>

>>>>>>> Hi team,  
>  
> I want to implement AvroRowDeserializationSchema when consume data from
> Kafka, however from the documentation, I did not understand what are
> avro_schema_string and record_class ? I would be great if you can give me
> the example on this (I only have the example on Java, however, I was doing
> it using PyFlink ).  
>  
> As my understanding avro_schema_string is schema_registry_url ? Does it
> support this  'debezium-avro-confluent.schema-
> registry.url'='{schema_registry_url}' like in TableAPI ?  
>  
> Best,  
> Quynh.
>>>>>>>

>>>>>>> Sent from [Mail](https://go.microsoft.com/fwlink/?LinkId=550986) for
Windows

>>>>>>>

>>>>>>>  
>>>>>

>>>>>  
>>>

>>>  
>>

>>  
>
>  






Re: AvroRowDeserializationSchema

Posted by Dian Fu <di...@gmail.com>.
Yes, I think so~

On Thu, Apr 28, 2022 at 11:00 AM lan tran <in...@gmail.com> wrote:

> Hi Dian,
>
> Sorry for missing your mail, so if I did as your suggestion and the Flink
> somehow crashed and we have to restart the service, does the Flink job know
> the offset where does it read from Kafka ?
>
>
>
> Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for
> Windows
>
>
>
> *From: *Dian Fu <di...@gmail.com>
> *Sent: *Tuesday, April 26, 2022 7:54 AM
> *To: *lan tran <in...@gmail.com>
> *Cc: *user@flink.apache.org
> *Subject: *Re: AvroRowDeserializationSchema
>
>
>
> Hi Quynh,
>
> The same code in my last reply showed how to set the UID for the source
> operator generated using Table API. I meant that you could firstly create a
> source using Table API, then convert it to a DataStream API and set uid for
> the source operator using the same code above, then perform operations with
> DataStream API.
>
> Regards,
> Dian
>
>
>
> On Mon, Apr 25, 2022 at 9:27 PM lan tran <in...@gmail.com> wrote:
>
> Hi Dian,
>
> Thank again for fast response.
>
> As your suggestion above, we can apply to set the UID for only for the
> DataStream state (as you suggest to convert from table to data stream).
>
> However, at the first phase which is collecting the data from Kafka (
> having Debezium format), the UID cannot be set since we are using Table API
> (auto generate the UID).
>
> Therefore, if there is some crashed or needed revert using SavePoint, we
> cannot use it in the first phase since we cannot set the UID for this => so
> how can we revert it ?.
>
> As a result of that, we want to use DebeziumAvroRowDeserializationSchema
> and DebeziumJsonRowDeserializationSchema in the DataStream job to be able
> to use the Savepoint for the whole full flow.
>
> Best,
> Quynh
>
>
>
> Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for
> Windows
>
>
>
> *From: *Dian Fu <di...@gmail.com>
> *Sent: *Monday, April 25, 2022 7:46 PM
> *To: *lan tran <in...@gmail.com>
> *Cc: *user@flink.apache.org
> *Subject: *Re: AvroRowDeserializationSchema
>
>
>
> Hi Quynh,
>
> You could try the following code (also it may be a little hacky):
> ```
>
> def set_uid_for_source(ds: DataStream, uid: str):
>
> transformation = ds._j_data_stream.getTransformation()
>
>
>
> source_transformation = transformation
>
> while not source_transformation.getInputs().isEmpty():
>
> source_transformation = source_transformation.getInputs().get(0)
>
>
>
> source_transformation.setUid(uid)
>
> ```
>
> Besides, could you describe your use case a bit and also how you want to
> use DebeziumAvroRowDeserializationSchema and
> DebeziumJsonRowDeserializationSchema in the DataStream job? Note that for
> the sources with these formats, it will send UPDATE messages to downstream
> operators.
>
> Regards
> Dian
>
>
>
> On Mon, Apr 25, 2022 at 12:31 PM lan tran <in...@gmail.com>
> wrote:
>
> Yeah, I already tried that way. However, if we did not use DataStream at
> first. We cannot implement the Savepoint since through the doc if we use
> TableAPI (SQL API), the uid is generated automatically which means we
> cannot revert if the system is crashed.
>
> Best,
> Quynh
>
>
>
> Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for
> Windows
>
>
>
> *From: *Dian Fu <di...@gmail.com>
> *Sent: *Monday, April 25, 2022 11:04 AM
> *To: *lan tran <in...@gmail.com>
> *Cc: *user@flink.apache.org
> *Subject: *Re: AvroRowDeserializationSchema
>
>
>
> DebeziumAvroRowDeserializationSchema and
> DebeziumJsonRowDeserializationSchema are still not supported in
> Python DataStream API.
>
> Just take a further look at the Java implementation of
> DebeziumAvroDeserializationSchema and DebeziumJsonDeserializationSchema,
> the results type is RowData instead of Row and so it should be not that
> easy to be directly supported in Python DataStream API. However, it
> supports conversion between Table API & DataStream API[1]. Could you
> firstly create a Table which consumes data from kafka and then convert it
> to a DataStream API?
>
> Regards,
> Dian
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/datastream/intro_to_datastream_api/#create-using-table--sql-connectors
>
>
>
> On Mon, Apr 25, 2022 at 11:48 AM Dian Fu <di...@gmail.com> wrote:
>
> Yes, we should support them.
>
> For now, if you want to use them, you could create ones in your own
> project. You could refer to AvroRowDeserializationSchema[1] as an example.
> It should not be complicated as it's simply a wrapper of the
> Java implementation.
>
> Regards,
> Dian
>
> [1]
> https://github.com/apache/flink/blob/e11a5c52c613e121f7a7868cbbfd9e7c21551394/flink-python/pyflink/common/serialization.py#L308
>
>
>
> On Mon, Apr 25, 2022 at 11:27 AM lan tran <in...@gmail.com>
> wrote:
>
> Thank Dian !! Very appreciate this.
>
> However, I have another questions related to this. In current version or
> any updating in future, does DataStream support
> DebeziumAvroRowDeserializationSchema and
> DebeziumJsonRowDeserializationSchema in PyFlink ? Since I look at the
> documentation and seem it is not supported yet.
>
> Best,
> Quynh
>
> Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for
> Windows
>
>
>
> *From: *Dian Fu <di...@gmail.com>
> *Sent: *Friday, April 22, 2022 9:36 PM
> *To: *lan tran <in...@gmail.com>
> *Cc: *user@flink.apache.org
> *Subject: *Re: AvroRowDeserializationSchema
>
>
>
> Hi Quynh,
>
> I have added an example on how to use AvroRowDeserializationSchema in
> Python DataStream API in [1]. Please take a look at if that helps for you~
>
> Regards,
> Dian
>
> [1]
> https://github.com/apache/flink/blob/release-1.15/flink-python/pyflink/examples/datastream/formats/avro_format.py
>
>
>
> On Fri, Apr 22, 2022 at 7:24 PM Dian Fu <di...@gmail.com> wrote:
>
> Hi Quynh,
>
> Could you show some sample code on how you use it?
>
> Regards,
> Dian
>
>
>
> On Fri, Apr 22, 2022 at 1:42 PM lan tran <in...@gmail.com> wrote:
>
> Wonder if this is a bug or not but if I use
> *AvroRowDeserializationSchema,*
>
> In PyFlink the error still occure ?
>
> py4j.protocol.Py4JError: An error occurred while calling
> None.org.apache.flink.formats.avro.AvroRowDeserializationSchema. Trace:
>
> org.apache.flink.api.python.shaded.py4j.Py4JException: Constructor
> org.apache.flink.formats.avro.AvroRowDeserializationSchema([class
> org.apache.avro.Schema$RecordSchema]) does not exist
>
> Therefore, please help check. Thanks
> Best,
> Quynh
>
>
>
>
>
> Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for
> Windows
>
>
>
> *From: *lan tran <in...@gmail.com>
> *Sent: *Thursday, April 21, 2022 1:43 PM
> *To: *user@flink.apache.org
> *Subject: *AvroRowDeserializationSchema
>
>
>
> Hi team,
>
> I want to implement AvroRowDeserializationSchema when consume data from
> Kafka, however from the documentation, I did not understand what are
> avro_schema_string and record_class ? I would be great if you can give me
> the example on this (I only have the example on Java, however, I was doing
> it using PyFlink ).
>
> As my understanding avro_schema_string is schema_registry_url ? Does it
> support this
>  'debezium-avro-confluent.schema-registry.url'='{schema_registry_url}' like
> in TableAPI ?
>
> Best,
> Quynh.
>
> Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for
> Windows
>
>
>
>
>
>
>
>
>
>
>
>
>

RE: AvroRowDeserializationSchema

Posted by lan tran <in...@gmail.com>.
Hi Dian,  
  
Sorry for missing your mail, so if I did as your suggestion and the Flink
somehow crashed and we have to restart the service, does the Flink job know
the offset where does it read from Kafka ?



Sent from [Mail](https://go.microsoft.com/fwlink/?LinkId=550986) for Windows



 **From:**[Dian Fu](mailto:dian0511.fu@gmail.com)  
 **Sent:** Tuesday, April 26, 2022 7:54 AM  
 **To:**[lan tran](mailto:indigoblue7075@gmail.com)  
 **Cc:**[user@flink.apache.org](mailto:user@flink.apache.org)  
 **Subject:** Re: AvroRowDeserializationSchema



Hi Quynh,  
  
The same code in my last reply showed how to set the UID for the source
operator generated using Table API. I meant that you could firstly create a
source using Table API, then convert it to a DataStream API and set uid for
the source operator using the same code above, then perform operations with
DataStream API.  
  
Regards,  
Dian



On Mon, Apr 25, 2022 at 9:27 PM lan tran
<[indigoblue7075@gmail.com](mailto:indigoblue7075@gmail.com)> wrote:

> Hi Dian,  
>  
> Thank again for fast response.  
>  
> As your suggestion above, we can apply to set the UID for only for the
> DataStream state (as you suggest to convert from table to data stream).  
>  
> However, at the first phase which is collecting the data from Kafka ( having
> Debezium format), the UID cannot be set since we are using Table API (auto
> generate the UID).  
>  
> Therefore, if there is some crashed or needed revert using SavePoint, we
> cannot use it in the first phase since we cannot set the UID for this => so
> how can we revert it ?.  
>  
> As a result of that, we want to use DebeziumAvroRowDeserializationSchema and
> DebeziumJsonRowDeserializationSchema in the DataStream job to be able to use
> the Savepoint for the whole full flow.  
>  
> Best,  
> Quynh
>
>  
>
> Sent from [Mail](https://go.microsoft.com/fwlink/?LinkId=550986) for Windows
>
>  
>
>  **From:**[Dian Fu](mailto:dian0511.fu@gmail.com)  
>  **Sent:** Monday, April 25, 2022 7:46 PM  
>  **To:**[lan tran](mailto:indigoblue7075@gmail.com)  
>  **Cc:**[user@flink.apache.org](mailto:user@flink.apache.org)  
>  **Subject:** Re: AvroRowDeserializationSchema
>
>  
>
> Hi Quynh,  
>  
> You could try the following code (also it may be a little hacky):  
> ```
>
> def set_uid_for_source(ds: DataStream, uid: str):
>
> transformation = ds._j_data_stream.getTransformation()
>
>  
>
> source_transformation = transformation
>
> while not source_transformation.getInputs().isEmpty():
>
> source_transformation = source_transformation.getInputs().get(0)
>
>  
>
> source_transformation.setUid(uid)
>
> ```  
>  
> Besides, could you describe your use case a bit and also how you want to use
> DebeziumAvroRowDeserializationSchema and
> DebeziumJsonRowDeserializationSchema in the DataStream job? Note that for
> the sources with these formats, it will send UPDATE messages to downstream
> operators.  
>  
> Regards  
> Dian
>
>  
>
> On Mon, Apr 25, 2022 at 12:31 PM lan tran
> <[indigoblue7075@gmail.com](mailto:indigoblue7075@gmail.com)> wrote:
>

>> Yeah, I already tried that way. However, if we did not use DataStream at
first. We cannot implement the Savepoint since through the doc if we use
TableAPI (SQL API), the uid is generated automatically which means we cannot
revert if the system is crashed.  
>  
> Best,  
> Quynh
>>

>>  
>>

>> Sent from [Mail](https://go.microsoft.com/fwlink/?LinkId=550986) for
Windows

>>

>>  
>>

>>  **From:**[Dian Fu](mailto:dian0511.fu@gmail.com)  
>  **Sent:** Monday, April 25, 2022 11:04 AM  
>  **To:**[lan tran](mailto:indigoblue7075@gmail.com)  
>  **Cc:**[user@flink.apache.org](mailto:user@flink.apache.org)  
>  **Subject:** Re: AvroRowDeserializationSchema
>>

>>  
>>

>> DebeziumAvroRowDeserializationSchema and
DebeziumJsonRowDeserializationSchema are still not supported in Python
DataStream API.  
>  
> Just take a further look at the Java implementation of
> DebeziumAvroDeserializationSchema and DebeziumJsonDeserializationSchema, the
> results type is RowData instead of Row and so it should be not that easy to
> be directly supported in Python DataStream API. However, it supports
> conversion between Table API & DataStream API[1]. Could you firstly create a
> Table which consumes data from kafka and then convert it to a DataStream
> API?  
>  
> Regards,  
> Dian  
>  
> [1] <https://nightlies.apache.org/flink/flink-docs-
> release-1.14/docs/dev/python/datastream/intro_to_datastream_api/#create-
> using-table--sql-connectors>
>>

>>  
>>

>> On Mon, Apr 25, 2022 at 11:48 AM Dian Fu
<[dian0511.fu@gmail.com](mailto:dian0511.fu@gmail.com)> wrote:

>>

>>> Yes, we should support them.  
>  
> For now, if you want to use them, you could create ones in your own project.
> You could refer to AvroRowDeserializationSchema[1] as an example. It should
> not be complicated as it's simply a wrapper of the Java implementation.  
>  
> Regards,  
> Dian  
>  
> [1]
> <https://github.com/apache/flink/blob/e11a5c52c613e121f7a7868cbbfd9e7c21551394/flink-
> python/pyflink/common/serialization.py#L308>
>>>

>>>  
>>>

>>> On Mon, Apr 25, 2022 at 11:27 AM lan tran
<[indigoblue7075@gmail.com](mailto:indigoblue7075@gmail.com)> wrote:

>>>

>>>> Thank Dian !! Very appreciate this.  
>  
> However, I have another questions related to this. In current version or any
> updating in future, does DataStream support
> DebeziumAvroRowDeserializationSchema and
> DebeziumJsonRowDeserializationSchema in PyFlink ? Since I look at the
> documentation and seem it is not supported yet.  
>  
> Best,  
> Quynh
>>>>

>>>> Sent from [Mail](https://go.microsoft.com/fwlink/?LinkId=550986) for
Windows

>>>>

>>>>  
>>>>

>>>>  **From:**[Dian Fu](mailto:dian0511.fu@gmail.com)  
>  **Sent:** Friday, April 22, 2022 9:36 PM  
>  **To:**[lan tran](mailto:indigoblue7075@gmail.com)  
>  **Cc:**[user@flink.apache.org](mailto:user@flink.apache.org)  
>  **Subject:** Re: AvroRowDeserializationSchema
>>>>

>>>>  
>>>>

>>>> Hi Quynh,  
>  
> I have added an example on how to use AvroRowDeserializationSchema in Python
> DataStream API in [1]. Please take a look at if that helps for you~  
>  
> Regards,  
> Dian  
>  
> [1] <https://github.com/apache/flink/blob/release-1.15/flink-
> python/pyflink/examples/datastream/formats/avro_format.py>
>>>>

>>>>  
>>>>

>>>> On Fri, Apr 22, 2022 at 7:24 PM Dian Fu
<[dian0511.fu@gmail.com](mailto:dian0511.fu@gmail.com)> wrote:

>>>>

>>>>> Hi Quynh,  
>  
> Could you show some sample code on how you use it?  
>  
> Regards,  
> Dian
>>>>>

>>>>>  
>>>>>

>>>>> On Fri, Apr 22, 2022 at 1:42 PM lan tran
<[indigoblue7075@gmail.com](mailto:indigoblue7075@gmail.com)> wrote:

>>>>>

>>>>>> Wonder if this is a bug or not but if I use
**AvroRowDeserializationSchema,**

>>>>>>

>>>>>> In PyFlink the error still occure ?  
>  
> py4j.protocol.Py4JError: An error occurred while calling
> None.org.apache.flink.formats.avro.AvroRowDeserializationSchema. Trace:
>>>>>>

>>>>>> org.apache.flink.api.python.shaded.py4j.Py4JException: Constructor
org.apache.flink.formats.avro.AvroRowDeserializationSchema([class
org.apache.avro.Schema$RecordSchema]) does not exist  
>  
> Therefore, please help check. Thanks  
> Best,  
> Quynh
>>>>>>

>>>>>>  
>>>>>>

>>>>>>  
>>>>>>

>>>>>> Sent from [Mail](https://go.microsoft.com/fwlink/?LinkId=550986) for
Windows

>>>>>>

>>>>>>  
>>>>>>

>>>>>>  **From:**[lan tran](mailto:indigoblue7075@gmail.com)  
>  **Sent:** Thursday, April 21, 2022 1:43 PM  
>  **To:**[user@flink.apache.org](mailto:user@flink.apache.org)  
>  **Subject:** AvroRowDeserializationSchema
>>>>>>

>>>>>>  
>>>>>>

>>>>>> Hi team,  
>  
> I want to implement AvroRowDeserializationSchema when consume data from
> Kafka, however from the documentation, I did not understand what are
> avro_schema_string and record_class ? I would be great if you can give me
> the example on this (I only have the example on Java, however, I was doing
> it using PyFlink ).  
>  
> As my understanding avro_schema_string is schema_registry_url ? Does it
> support this  'debezium-avro-confluent.schema-
> registry.url'='{schema_registry_url}' like in TableAPI ?  
>  
> Best,  
> Quynh.
>>>>>>

>>>>>> Sent from [Mail](https://go.microsoft.com/fwlink/?LinkId=550986) for
Windows

>>>>>>

>>>>>>  
>>>>

>>>>  
>>

>>  
>
>  






Re: AvroRowDeserializationSchema

Posted by Dian Fu <di...@gmail.com>.
Hi Quynh,

The same code in my last reply showed how to set the UID for the source
operator generated using Table API. I meant that you could firstly create a
source using Table API, then convert it to a DataStream API and set uid for
the source operator using the same code above, then perform operations with
DataStream API.

Regards,
Dian

On Mon, Apr 25, 2022 at 9:27 PM lan tran <in...@gmail.com> wrote:

> Hi Dian,
>
> Thank again for fast response.
>
> As your suggestion above, we can apply to set the UID for only for the
> DataStream state (as you suggest to convert from table to data stream).
>
> However, at the first phase which is collecting the data from Kafka (
> having Debezium format), the UID cannot be set since we are using Table API
> (auto generate the UID).
>
> Therefore, if there is some crashed or needed revert using SavePoint, we
> cannot use it in the first phase since we cannot set the UID for this => so
> how can we revert it ?.
>
> As a result of that, we want to use DebeziumAvroRowDeserializationSchema
> and DebeziumJsonRowDeserializationSchema in the DataStream job to be able
> to use the Savepoint for the whole full flow.
>
> Best,
> Quynh
>
>
>
> Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for
> Windows
>
>
>
> *From: *Dian Fu <di...@gmail.com>
> *Sent: *Monday, April 25, 2022 7:46 PM
> *To: *lan tran <in...@gmail.com>
> *Cc: *user@flink.apache.org
> *Subject: *Re: AvroRowDeserializationSchema
>
>
>
> Hi Quynh,
>
> You could try the following code (also it may be a little hacky):
> ```
>
> def set_uid_for_source(ds: DataStream, uid: str):
>
> transformation = ds._j_data_stream.getTransformation()
>
>
>
> source_transformation = transformation
>
> while not source_transformation.getInputs().isEmpty():
>
> source_transformation = source_transformation.getInputs().get(0)
>
>
>
> source_transformation.setUid(uid)
>
> ```
>
> Besides, could you describe your use case a bit and also how you want to
> use DebeziumAvroRowDeserializationSchema and
> DebeziumJsonRowDeserializationSchema in the DataStream job? Note that for
> the sources with these formats, it will send UPDATE messages to downstream
> operators.
>
> Regards
> Dian
>
>
>
> On Mon, Apr 25, 2022 at 12:31 PM lan tran <in...@gmail.com>
> wrote:
>
> Yeah, I already tried that way. However, if we did not use DataStream at
> first. We cannot implement the Savepoint since through the doc if we use
> TableAPI (SQL API), the uid is generated automatically which means we
> cannot revert if the system is crashed.
>
> Best,
> Quynh
>
>
>
> Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for
> Windows
>
>
>
> *From: *Dian Fu <di...@gmail.com>
> *Sent: *Monday, April 25, 2022 11:04 AM
> *To: *lan tran <in...@gmail.com>
> *Cc: *user@flink.apache.org
> *Subject: *Re: AvroRowDeserializationSchema
>
>
>
> DebeziumAvroRowDeserializationSchema and
> DebeziumJsonRowDeserializationSchema are still not supported in
> Python DataStream API.
>
> Just take a further look at the Java implementation of
> DebeziumAvroDeserializationSchema and DebeziumJsonDeserializationSchema,
> the results type is RowData instead of Row and so it should be not that
> easy to be directly supported in Python DataStream API. However, it
> supports conversion between Table API & DataStream API[1]. Could you
> firstly create a Table which consumes data from kafka and then convert it
> to a DataStream API?
>
> Regards,
> Dian
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/datastream/intro_to_datastream_api/#create-using-table--sql-connectors
>
>
>
> On Mon, Apr 25, 2022 at 11:48 AM Dian Fu <di...@gmail.com> wrote:
>
> Yes, we should support them.
>
> For now, if you want to use them, you could create ones in your own
> project. You could refer to AvroRowDeserializationSchema[1] as an example.
> It should not be complicated as it's simply a wrapper of the
> Java implementation.
>
> Regards,
> Dian
>
> [1]
> https://github.com/apache/flink/blob/e11a5c52c613e121f7a7868cbbfd9e7c21551394/flink-python/pyflink/common/serialization.py#L308
>
>
>
> On Mon, Apr 25, 2022 at 11:27 AM lan tran <in...@gmail.com>
> wrote:
>
> Thank Dian !! Very appreciate this.
>
> However, I have another questions related to this. In current version or
> any updating in future, does DataStream support
> DebeziumAvroRowDeserializationSchema and
> DebeziumJsonRowDeserializationSchema in PyFlink ? Since I look at the
> documentation and seem it is not supported yet.
>
> Best,
> Quynh
>
> Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for
> Windows
>
>
>
> *From: *Dian Fu <di...@gmail.com>
> *Sent: *Friday, April 22, 2022 9:36 PM
> *To: *lan tran <in...@gmail.com>
> *Cc: *user@flink.apache.org
> *Subject: *Re: AvroRowDeserializationSchema
>
>
>
> Hi Quynh,
>
> I have added an example on how to use AvroRowDeserializationSchema in
> Python DataStream API in [1]. Please take a look at if that helps for you~
>
> Regards,
> Dian
>
> [1]
> https://github.com/apache/flink/blob/release-1.15/flink-python/pyflink/examples/datastream/formats/avro_format.py
>
>
>
> On Fri, Apr 22, 2022 at 7:24 PM Dian Fu <di...@gmail.com> wrote:
>
> Hi Quynh,
>
> Could you show some sample code on how you use it?
>
> Regards,
> Dian
>
>
>
> On Fri, Apr 22, 2022 at 1:42 PM lan tran <in...@gmail.com> wrote:
>
> Wonder if this is a bug or not but if I use
> *AvroRowDeserializationSchema,*
>
> In PyFlink the error still occure ?
>
> py4j.protocol.Py4JError: An error occurred while calling
> None.org.apache.flink.formats.avro.AvroRowDeserializationSchema. Trace:
>
> org.apache.flink.api.python.shaded.py4j.Py4JException: Constructor
> org.apache.flink.formats.avro.AvroRowDeserializationSchema([class
> org.apache.avro.Schema$RecordSchema]) does not exist
>
> Therefore, please help check. Thanks
> Best,
> Quynh
>
>
>
>
>
> Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for
> Windows
>
>
>
> *From: *lan tran <in...@gmail.com>
> *Sent: *Thursday, April 21, 2022 1:43 PM
> *To: *user@flink.apache.org
> *Subject: *AvroRowDeserializationSchema
>
>
>
> Hi team,
>
> I want to implement AvroRowDeserializationSchema when consume data from
> Kafka, however from the documentation, I did not understand what are
> avro_schema_string and record_class ? I would be great if you can give me
> the example on this (I only have the example on Java, however, I was doing
> it using PyFlink ).
>
> As my understanding avro_schema_string is schema_registry_url ? Does it
> support this
>  'debezium-avro-confluent.schema-registry.url'='{schema_registry_url}' like
> in TableAPI ?
>
> Best,
> Quynh.
>
> Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for
> Windows
>
>
>
>
>
>
>
>
>
>
>

RE: AvroRowDeserializationSchema

Posted by lan tran <in...@gmail.com>.
Hi Dian,  
  
Thank again for fast response.  
  
As your suggestion above, we can apply to set the UID for only for the
DataStream state (as you suggest to convert from table to data stream).  
  
However, at the first phase which is collecting the data from Kafka ( having
Debezium format), the UID cannot be set since we are using Table API (auto
generate the UID).  
  
Therefore, if there is some crashed or needed revert using SavePoint, we
cannot use it in the first phase since we cannot set the UID for this => so
how can we revert it ?.  
  
As a result of that, we want to use DebeziumAvroRowDeserializationSchema and
DebeziumJsonRowDeserializationSchema in the DataStream job to be able to use
the Savepoint for the whole full flow.  
  
Best,  
Quynh



Sent from [Mail](https://go.microsoft.com/fwlink/?LinkId=550986) for Windows



 **From:**[Dian Fu](mailto:dian0511.fu@gmail.com)  
 **Sent:** Monday, April 25, 2022 7:46 PM  
 **To:**[lan tran](mailto:indigoblue7075@gmail.com)  
 **Cc:**[user@flink.apache.org](mailto:user@flink.apache.org)  
 **Subject:** Re: AvroRowDeserializationSchema



Hi Quynh,  
  
You could try the following code (also it may be a little hacky):  
```

def set_uid_for_source(ds: DataStream, uid: str):

transformation = ds._j_data_stream.getTransformation()



source_transformation = transformation

while not source_transformation.getInputs().isEmpty():

source_transformation = source_transformation.getInputs().get(0)



source_transformation.setUid(uid)

```  
  
Besides, could you describe your use case a bit and also how you want to use
DebeziumAvroRowDeserializationSchema and DebeziumJsonRowDeserializationSchema
in the DataStream job? Note that for the sources with these formats, it will
send UPDATE messages to downstream operators.  
  
Regards  
Dian



On Mon, Apr 25, 2022 at 12:31 PM lan tran
<[indigoblue7075@gmail.com](mailto:indigoblue7075@gmail.com)> wrote:

> Yeah, I already tried that way. However, if we did not use DataStream at
> first. We cannot implement the Savepoint since through the doc if we use
> TableAPI (SQL API), the uid is generated automatically which means we cannot
> revert if the system is crashed.  
>  
> Best,  
> Quynh
>
>  
>
> Sent from [Mail](https://go.microsoft.com/fwlink/?LinkId=550986) for Windows
>
>  
>
>  **From:**[Dian Fu](mailto:dian0511.fu@gmail.com)  
>  **Sent:** Monday, April 25, 2022 11:04 AM  
>  **To:**[lan tran](mailto:indigoblue7075@gmail.com)  
>  **Cc:**[user@flink.apache.org](mailto:user@flink.apache.org)  
>  **Subject:** Re: AvroRowDeserializationSchema
>
>  
>
> DebeziumAvroRowDeserializationSchema and
> DebeziumJsonRowDeserializationSchema are still not supported in Python
> DataStream API.  
>  
> Just take a further look at the Java implementation of
> DebeziumAvroDeserializationSchema and DebeziumJsonDeserializationSchema, the
> results type is RowData instead of Row and so it should be not that easy to
> be directly supported in Python DataStream API. However, it supports
> conversion between Table API & DataStream API[1]. Could you firstly create a
> Table which consumes data from kafka and then convert it to a DataStream
> API?  
>  
> Regards,  
> Dian  
>  
> [1] <https://nightlies.apache.org/flink/flink-docs-
> release-1.14/docs/dev/python/datastream/intro_to_datastream_api/#create-
> using-table--sql-connectors>
>
>  
>
> On Mon, Apr 25, 2022 at 11:48 AM Dian Fu
> <[dian0511.fu@gmail.com](mailto:dian0511.fu@gmail.com)> wrote:
>

>> Yes, we should support them.  
>  
> For now, if you want to use them, you could create ones in your own project.
> You could refer to AvroRowDeserializationSchema[1] as an example. It should
> not be complicated as it's simply a wrapper of the Java implementation.  
>  
> Regards,  
> Dian  
>  
> [1]
> <https://github.com/apache/flink/blob/e11a5c52c613e121f7a7868cbbfd9e7c21551394/flink-
> python/pyflink/common/serialization.py#L308>
>>

>>  
>>

>> On Mon, Apr 25, 2022 at 11:27 AM lan tran
<[indigoblue7075@gmail.com](mailto:indigoblue7075@gmail.com)> wrote:

>>

>>> Thank Dian !! Very appreciate this.  
>  
> However, I have another questions related to this. In current version or any
> updating in future, does DataStream support
> DebeziumAvroRowDeserializationSchema and
> DebeziumJsonRowDeserializationSchema in PyFlink ? Since I look at the
> documentation and seem it is not supported yet.  
>  
> Best,  
> Quynh
>>>

>>> Sent from [Mail](https://go.microsoft.com/fwlink/?LinkId=550986) for
Windows

>>>

>>>  
>>>

>>>  **From:**[Dian Fu](mailto:dian0511.fu@gmail.com)  
>  **Sent:** Friday, April 22, 2022 9:36 PM  
>  **To:**[lan tran](mailto:indigoblue7075@gmail.com)  
>  **Cc:**[user@flink.apache.org](mailto:user@flink.apache.org)  
>  **Subject:** Re: AvroRowDeserializationSchema
>>>

>>>  
>>>

>>> Hi Quynh,  
>  
> I have added an example on how to use AvroRowDeserializationSchema in Python
> DataStream API in [1]. Please take a look at if that helps for you~  
>  
> Regards,  
> Dian  
>  
> [1] <https://github.com/apache/flink/blob/release-1.15/flink-
> python/pyflink/examples/datastream/formats/avro_format.py>
>>>

>>>  
>>>

>>> On Fri, Apr 22, 2022 at 7:24 PM Dian Fu
<[dian0511.fu@gmail.com](mailto:dian0511.fu@gmail.com)> wrote:

>>>

>>>> Hi Quynh,  
>  
> Could you show some sample code on how you use it?  
>  
> Regards,  
> Dian
>>>>

>>>>  
>>>>

>>>> On Fri, Apr 22, 2022 at 1:42 PM lan tran
<[indigoblue7075@gmail.com](mailto:indigoblue7075@gmail.com)> wrote:

>>>>

>>>>> Wonder if this is a bug or not but if I use
**AvroRowDeserializationSchema,**

>>>>>

>>>>> In PyFlink the error still occure ?  
>  
> py4j.protocol.Py4JError: An error occurred while calling
> None.org.apache.flink.formats.avro.AvroRowDeserializationSchema. Trace:
>>>>>

>>>>> org.apache.flink.api.python.shaded.py4j.Py4JException: Constructor
org.apache.flink.formats.avro.AvroRowDeserializationSchema([class
org.apache.avro.Schema$RecordSchema]) does not exist  
>  
> Therefore, please help check. Thanks  
> Best,  
> Quynh
>>>>>

>>>>>  
>>>>>

>>>>>  
>>>>>

>>>>> Sent from [Mail](https://go.microsoft.com/fwlink/?LinkId=550986) for
Windows

>>>>>

>>>>>  
>>>>>

>>>>>  **From:**[lan tran](mailto:indigoblue7075@gmail.com)  
>  **Sent:** Thursday, April 21, 2022 1:43 PM  
>  **To:**[user@flink.apache.org](mailto:user@flink.apache.org)  
>  **Subject:** AvroRowDeserializationSchema
>>>>>

>>>>>  
>>>>>

>>>>> Hi team,  
>  
> I want to implement AvroRowDeserializationSchema when consume data from
> Kafka, however from the documentation, I did not understand what are
> avro_schema_string and record_class ? I would be great if you can give me
> the example on this (I only have the example on Java, however, I was doing
> it using PyFlink ).  
>  
> As my understanding avro_schema_string is schema_registry_url ? Does it
> support this  'debezium-avro-confluent.schema-
> registry.url'='{schema_registry_url}' like in TableAPI ?  
>  
> Best,  
> Quynh.
>>>>>

>>>>> Sent from [Mail](https://go.microsoft.com/fwlink/?LinkId=550986) for
Windows

>>>>>

>>>>>  
>>>

>>>  
>
>  






Re: AvroRowDeserializationSchema

Posted by Dian Fu <di...@gmail.com>.
Hi Quynh,

You could try the following code (also it may be a little hacky):
```
def set_uid_for_source(ds: DataStream, uid: str):
transformation = ds._j_data_stream.getTransformation()

source_transformation = transformation
while not source_transformation.getInputs().isEmpty():
source_transformation = source_transformation.getInputs().get(0)

source_transformation.setUid(uid)
```

Besides, could you describe your use case a bit and also how you want to
use DebeziumAvroRowDeserializationSchema and
DebeziumJsonRowDeserializationSchema in the DataStream job? Note that for
the sources with these formats, it will send UPDATE messages to downstream
operators.

Regards
Dian

On Mon, Apr 25, 2022 at 12:31 PM lan tran <in...@gmail.com> wrote:

> Yeah, I already tried that way. However, if we did not use DataStream at
> first. We cannot implement the Savepoint since through the doc if we use
> TableAPI (SQL API), the uid is generated automatically which means we
> cannot revert if the system is crashed.
>
> Best,
> Quynh
>
>
>
> Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for
> Windows
>
>
>
> *From: *Dian Fu <di...@gmail.com>
> *Sent: *Monday, April 25, 2022 11:04 AM
> *To: *lan tran <in...@gmail.com>
> *Cc: *user@flink.apache.org
> *Subject: *Re: AvroRowDeserializationSchema
>
>
>
> DebeziumAvroRowDeserializationSchema and
> DebeziumJsonRowDeserializationSchema are still not supported in
> Python DataStream API.
>
> Just take a further look at the Java implementation of
> DebeziumAvroDeserializationSchema and DebeziumJsonDeserializationSchema,
> the results type is RowData instead of Row and so it should be not that
> easy to be directly supported in Python DataStream API. However, it
> supports conversion between Table API & DataStream API[1]. Could you
> firstly create a Table which consumes data from kafka and then convert it
> to a DataStream API?
>
> Regards,
> Dian
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/datastream/intro_to_datastream_api/#create-using-table--sql-connectors
>
>
>
> On Mon, Apr 25, 2022 at 11:48 AM Dian Fu <di...@gmail.com> wrote:
>
> Yes, we should support them.
>
> For now, if you want to use them, you could create ones in your own
> project. You could refer to AvroRowDeserializationSchema[1] as an example.
> It should not be complicated as it's simply a wrapper of the
> Java implementation.
>
> Regards,
> Dian
>
> [1]
> https://github.com/apache/flink/blob/e11a5c52c613e121f7a7868cbbfd9e7c21551394/flink-python/pyflink/common/serialization.py#L308
>
>
>
> On Mon, Apr 25, 2022 at 11:27 AM lan tran <in...@gmail.com>
> wrote:
>
> Thank Dian !! Very appreciate this.
>
> However, I have another questions related to this. In current version or
> any updating in future, does DataStream support
> DebeziumAvroRowDeserializationSchema and
> DebeziumJsonRowDeserializationSchema in PyFlink ? Since I look at the
> documentation and seem it is not supported yet.
>
> Best,
> Quynh
>
> Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for
> Windows
>
>
>
> *From: *Dian Fu <di...@gmail.com>
> *Sent: *Friday, April 22, 2022 9:36 PM
> *To: *lan tran <in...@gmail.com>
> *Cc: *user@flink.apache.org
> *Subject: *Re: AvroRowDeserializationSchema
>
>
>
> Hi Quynh,
>
> I have added an example on how to use AvroRowDeserializationSchema in
> Python DataStream API in [1]. Please take a look at if that helps for you~
>
> Regards,
> Dian
>
> [1]
> https://github.com/apache/flink/blob/release-1.15/flink-python/pyflink/examples/datastream/formats/avro_format.py
>
>
>
> On Fri, Apr 22, 2022 at 7:24 PM Dian Fu <di...@gmail.com> wrote:
>
> Hi Quynh,
>
> Could you show some sample code on how you use it?
>
> Regards,
> Dian
>
>
>
> On Fri, Apr 22, 2022 at 1:42 PM lan tran <in...@gmail.com> wrote:
>
> Wonder if this is a bug or not but if I use
> *AvroRowDeserializationSchema,*
>
> In PyFlink the error still occure ?
>
> py4j.protocol.Py4JError: An error occurred while calling
> None.org.apache.flink.formats.avro.AvroRowDeserializationSchema. Trace:
>
> org.apache.flink.api.python.shaded.py4j.Py4JException: Constructor
> org.apache.flink.formats.avro.AvroRowDeserializationSchema([class
> org.apache.avro.Schema$RecordSchema]) does not exist
>
> Therefore, please help check. Thanks
> Best,
> Quynh
>
>
>
>
>
> Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for
> Windows
>
>
>
> *From: *lan tran <in...@gmail.com>
> *Sent: *Thursday, April 21, 2022 1:43 PM
> *To: *user@flink.apache.org
> *Subject: *AvroRowDeserializationSchema
>
>
>
> Hi team,
>
> I want to implement AvroRowDeserializationSchema when consume data from
> Kafka, however from the documentation, I did not understand what are
> avro_schema_string and record_class ? I would be great if you can give me
> the example on this (I only have the example on Java, however, I was doing
> it using PyFlink ).
>
> As my understanding avro_schema_string is schema_registry_url ? Does it
> support this
>  'debezium-avro-confluent.schema-registry.url'='{schema_registry_url}' like
> in TableAPI ?
>
> Best,
> Quynh.
>
> Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for
> Windows
>
>
>
>
>
>
>
>
>

RE: AvroRowDeserializationSchema

Posted by lan tran <in...@gmail.com>.
Yeah, I already tried that way. However, if we did not use DataStream at
first. We cannot implement the Savepoint since through the doc if we use
TableAPI (SQL API), the uid is generated automatically which means we cannot
revert if the system is crashed.  
  
Best,  
Quynh



Sent from [Mail](https://go.microsoft.com/fwlink/?LinkId=550986) for Windows



 **From:**[Dian Fu](mailto:dian0511.fu@gmail.com)  
 **Sent:** Monday, April 25, 2022 11:04 AM  
 **To:**[lan tran](mailto:indigoblue7075@gmail.com)  
 **Cc:**[user@flink.apache.org](mailto:user@flink.apache.org)  
 **Subject:** Re: AvroRowDeserializationSchema



DebeziumAvroRowDeserializationSchema and DebeziumJsonRowDeserializationSchema
are still not supported in Python DataStream API.  
  
Just take a further look at the Java implementation of
DebeziumAvroDeserializationSchema and DebeziumJsonDeserializationSchema, the
results type is RowData instead of Row and so it should be not that easy to be
directly supported in Python DataStream API. However, it supports conversion
between Table API & DataStream API[1]. Could you firstly create a Table which
consumes data from kafka and then convert it to a DataStream API?  
  
Regards,  
Dian  
  
[1] <https://nightlies.apache.org/flink/flink-docs-
release-1.14/docs/dev/python/datastream/intro_to_datastream_api/#create-using-
table--sql-connectors>



On Mon, Apr 25, 2022 at 11:48 AM Dian Fu
<[dian0511.fu@gmail.com](mailto:dian0511.fu@gmail.com)> wrote:

> Yes, we should support them.  
>  
> For now, if you want to use them, you could create ones in your own project.
> You could refer to AvroRowDeserializationSchema[1] as an example. It should
> not be complicated as it's simply a wrapper of the Java implementation.  
>  
> Regards,  
> Dian  
>  
> [1]
> <https://github.com/apache/flink/blob/e11a5c52c613e121f7a7868cbbfd9e7c21551394/flink-
> python/pyflink/common/serialization.py#L308>
>
>  
>
> On Mon, Apr 25, 2022 at 11:27 AM lan tran
> <[indigoblue7075@gmail.com](mailto:indigoblue7075@gmail.com)> wrote:
>

>> Thank Dian !! Very appreciate this.  
>  
> However, I have another questions related to this. In current version or any
> updating in future, does DataStream support
> DebeziumAvroRowDeserializationSchema and
> DebeziumJsonRowDeserializationSchema in PyFlink ? Since I look at the
> documentation and seem it is not supported yet.  
>  
> Best,  
> Quynh
>>

>> Sent from [Mail](https://go.microsoft.com/fwlink/?LinkId=550986) for
Windows

>>

>>  
>>

>>  **From:**[Dian Fu](mailto:dian0511.fu@gmail.com)  
>  **Sent:** Friday, April 22, 2022 9:36 PM  
>  **To:**[lan tran](mailto:indigoblue7075@gmail.com)  
>  **Cc:**[user@flink.apache.org](mailto:user@flink.apache.org)  
>  **Subject:** Re: AvroRowDeserializationSchema
>>

>>  
>>

>> Hi Quynh,  
>  
> I have added an example on how to use AvroRowDeserializationSchema in Python
> DataStream API in [1]. Please take a look at if that helps for you~  
>  
> Regards,  
> Dian  
>  
> [1] <https://github.com/apache/flink/blob/release-1.15/flink-
> python/pyflink/examples/datastream/formats/avro_format.py>
>>

>>  
>>

>> On Fri, Apr 22, 2022 at 7:24 PM Dian Fu
<[dian0511.fu@gmail.com](mailto:dian0511.fu@gmail.com)> wrote:

>>

>>> Hi Quynh,  
>  
> Could you show some sample code on how you use it?  
>  
> Regards,  
> Dian
>>>

>>>  
>>>

>>> On Fri, Apr 22, 2022 at 1:42 PM lan tran
<[indigoblue7075@gmail.com](mailto:indigoblue7075@gmail.com)> wrote:

>>>

>>>> Wonder if this is a bug or not but if I use
**AvroRowDeserializationSchema,**

>>>>

>>>> In PyFlink the error still occure ?  
>  
> py4j.protocol.Py4JError: An error occurred while calling
> None.org.apache.flink.formats.avro.AvroRowDeserializationSchema. Trace:
>>>>

>>>> org.apache.flink.api.python.shaded.py4j.Py4JException: Constructor
org.apache.flink.formats.avro.AvroRowDeserializationSchema([class
org.apache.avro.Schema$RecordSchema]) does not exist  
>  
> Therefore, please help check. Thanks  
> Best,  
> Quynh
>>>>

>>>>  
>>>>

>>>>  
>>>>

>>>> Sent from [Mail](https://go.microsoft.com/fwlink/?LinkId=550986) for
Windows

>>>>

>>>>  
>>>>

>>>>  **From:**[lan tran](mailto:indigoblue7075@gmail.com)  
>  **Sent:** Thursday, April 21, 2022 1:43 PM  
>  **To:**[user@flink.apache.org](mailto:user@flink.apache.org)  
>  **Subject:** AvroRowDeserializationSchema
>>>>

>>>>  
>>>>

>>>> Hi team,  
>  
> I want to implement AvroRowDeserializationSchema when consume data from
> Kafka, however from the documentation, I did not understand what are
> avro_schema_string and record_class ? I would be great if you can give me
> the example on this (I only have the example on Java, however, I was doing
> it using PyFlink ).  
>  
> As my understanding avro_schema_string is schema_registry_url ? Does it
> support this  'debezium-avro-confluent.schema-
> registry.url'='{schema_registry_url}' like in TableAPI ?  
>  
> Best,  
> Quynh.
>>>>

>>>> Sent from [Mail](https://go.microsoft.com/fwlink/?LinkId=550986) for
Windows

>>>>

>>>>  
>>

>>  






Re: AvroRowDeserializationSchema

Posted by Dian Fu <di...@gmail.com>.
DebeziumAvroRowDeserializationSchema and
DebeziumJsonRowDeserializationSchema are still not supported in
Python DataStream API.

Just take a further look at the Java implementation of
DebeziumAvroDeserializationSchema and DebeziumJsonDeserializationSchema,
the results type is RowData instead of Row and so it should be not that
easy to be directly supported in Python DataStream API. However, it
supports conversion between Table API & DataStream API[1]. Could you
firstly create a Table which consumes data from kafka and then convert it
to a DataStream API?

Regards,
Dian

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/datastream/intro_to_datastream_api/#create-using-table--sql-connectors

On Mon, Apr 25, 2022 at 11:48 AM Dian Fu <di...@gmail.com> wrote:

> Yes, we should support them.
>
> For now, if you want to use them, you could create ones in your own
> project. You could refer to AvroRowDeserializationSchema[1] as an example.
> It should not be complicated as it's simply a wrapper of the
> Java implementation.
>
> Regards,
> Dian
>
> [1]
> https://github.com/apache/flink/blob/e11a5c52c613e121f7a7868cbbfd9e7c21551394/flink-python/pyflink/common/serialization.py#L308
>
> On Mon, Apr 25, 2022 at 11:27 AM lan tran <in...@gmail.com>
> wrote:
>
>> Thank Dian !! Very appreciate this.
>>
>> However, I have another questions related to this. In current version or
>> any updating in future, does DataStream support
>> DebeziumAvroRowDeserializationSchema and
>> DebeziumJsonRowDeserializationSchema in PyFlink ? Since I look at the
>> documentation and seem it is not supported yet.
>>
>> Best,
>> Quynh
>>
>> Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for
>> Windows
>>
>>
>>
>> *From: *Dian Fu <di...@gmail.com>
>> *Sent: *Friday, April 22, 2022 9:36 PM
>> *To: *lan tran <in...@gmail.com>
>> *Cc: *user@flink.apache.org
>> *Subject: *Re: AvroRowDeserializationSchema
>>
>>
>>
>> Hi Quynh,
>>
>> I have added an example on how to use AvroRowDeserializationSchema in
>> Python DataStream API in [1]. Please take a look at if that helps for you~
>>
>> Regards,
>> Dian
>>
>> [1]
>> https://github.com/apache/flink/blob/release-1.15/flink-python/pyflink/examples/datastream/formats/avro_format.py
>>
>>
>>
>> On Fri, Apr 22, 2022 at 7:24 PM Dian Fu <di...@gmail.com> wrote:
>>
>> Hi Quynh,
>>
>> Could you show some sample code on how you use it?
>>
>> Regards,
>> Dian
>>
>>
>>
>> On Fri, Apr 22, 2022 at 1:42 PM lan tran <in...@gmail.com>
>> wrote:
>>
>> Wonder if this is a bug or not but if I use
>> *AvroRowDeserializationSchema,*
>>
>> In PyFlink the error still occure ?
>>
>> py4j.protocol.Py4JError: An error occurred while calling
>> None.org.apache.flink.formats.avro.AvroRowDeserializationSchema. Trace:
>>
>> org.apache.flink.api.python.shaded.py4j.Py4JException: Constructor
>> org.apache.flink.formats.avro.AvroRowDeserializationSchema([class
>> org.apache.avro.Schema$RecordSchema]) does not exist
>>
>> Therefore, please help check. Thanks
>> Best,
>> Quynh
>>
>>
>>
>>
>>
>> Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for
>> Windows
>>
>>
>>
>> *From: *lan tran <in...@gmail.com>
>> *Sent: *Thursday, April 21, 2022 1:43 PM
>> *To: *user@flink.apache.org
>> *Subject: *AvroRowDeserializationSchema
>>
>>
>>
>> Hi team,
>>
>> I want to implement AvroRowDeserializationSchema when consume data from
>> Kafka, however from the documentation, I did not understand what are
>> avro_schema_string and record_class ? I would be great if you can give me
>> the example on this (I only have the example on Java, however, I was doing
>> it using PyFlink ).
>>
>> As my understanding avro_schema_string is schema_registry_url ? Does it
>> support this
>>  'debezium-avro-confluent.schema-registry.url'='{schema_registry_url}' like
>> in TableAPI ?
>>
>> Best,
>> Quynh.
>>
>> Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for
>> Windows
>>
>>
>>
>>
>>
>>
>>
>

Re: AvroRowDeserializationSchema

Posted by Dian Fu <di...@gmail.com>.
Yes, we should support them.

For now, if you want to use them, you could create ones in your own
project. You could refer to AvroRowDeserializationSchema[1] as an example.
It should not be complicated as it's simply a wrapper of the
Java implementation.

Regards,
Dian

[1]
https://github.com/apache/flink/blob/e11a5c52c613e121f7a7868cbbfd9e7c21551394/flink-python/pyflink/common/serialization.py#L308

On Mon, Apr 25, 2022 at 11:27 AM lan tran <in...@gmail.com> wrote:

> Thank Dian !! Very appreciate this.
>
> However, I have another questions related to this. In current version or
> any updating in future, does DataStream support
> DebeziumAvroRowDeserializationSchema and
> DebeziumJsonRowDeserializationSchema in PyFlink ? Since I look at the
> documentation and seem it is not supported yet.
>
> Best,
> Quynh
>
> Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for
> Windows
>
>
>
> *From: *Dian Fu <di...@gmail.com>
> *Sent: *Friday, April 22, 2022 9:36 PM
> *To: *lan tran <in...@gmail.com>
> *Cc: *user@flink.apache.org
> *Subject: *Re: AvroRowDeserializationSchema
>
>
>
> Hi Quynh,
>
> I have added an example on how to use AvroRowDeserializationSchema in
> Python DataStream API in [1]. Please take a look at if that helps for you~
>
> Regards,
> Dian
>
> [1]
> https://github.com/apache/flink/blob/release-1.15/flink-python/pyflink/examples/datastream/formats/avro_format.py
>
>
>
> On Fri, Apr 22, 2022 at 7:24 PM Dian Fu <di...@gmail.com> wrote:
>
> Hi Quynh,
>
> Could you show some sample code on how you use it?
>
> Regards,
> Dian
>
>
>
> On Fri, Apr 22, 2022 at 1:42 PM lan tran <in...@gmail.com> wrote:
>
> Wonder if this is a bug or not but if I use
> *AvroRowDeserializationSchema,*
>
> In PyFlink the error still occure ?
>
> py4j.protocol.Py4JError: An error occurred while calling
> None.org.apache.flink.formats.avro.AvroRowDeserializationSchema. Trace:
>
> org.apache.flink.api.python.shaded.py4j.Py4JException: Constructor
> org.apache.flink.formats.avro.AvroRowDeserializationSchema([class
> org.apache.avro.Schema$RecordSchema]) does not exist
>
> Therefore, please help check. Thanks
> Best,
> Quynh
>
>
>
>
>
> Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for
> Windows
>
>
>
> *From: *lan tran <in...@gmail.com>
> *Sent: *Thursday, April 21, 2022 1:43 PM
> *To: *user@flink.apache.org
> *Subject: *AvroRowDeserializationSchema
>
>
>
> Hi team,
>
> I want to implement AvroRowDeserializationSchema when consume data from
> Kafka, however from the documentation, I did not understand what are
> avro_schema_string and record_class ? I would be great if you can give me
> the example on this (I only have the example on Java, however, I was doing
> it using PyFlink ).
>
> As my understanding avro_schema_string is schema_registry_url ? Does it
> support this
>  'debezium-avro-confluent.schema-registry.url'='{schema_registry_url}' like
> in TableAPI ?
>
> Best,
> Quynh.
>
> Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for
> Windows
>
>
>
>
>
>
>

RE: AvroRowDeserializationSchema

Posted by lan tran <in...@gmail.com>.
Thank Dian !! Very appreciate this.  
  
However, I have another questions related to this. In current version or any
updating in future, does DataStream support
DebeziumAvroRowDeserializationSchema and DebeziumJsonRowDeserializationSchema
in PyFlink ? Since I look at the documentation and seem it is not supported
yet.  
  
Best,  
Quynh

Sent from [Mail](https://go.microsoft.com/fwlink/?LinkId=550986) for Windows



 **From:**[Dian Fu](mailto:dian0511.fu@gmail.com)  
 **Sent:** Friday, April 22, 2022 9:36 PM  
 **To:**[lan tran](mailto:indigoblue7075@gmail.com)  
 **Cc:**[user@flink.apache.org](mailto:user@flink.apache.org)  
 **Subject:** Re: AvroRowDeserializationSchema



Hi Quynh,  
  
I have added an example on how to use AvroRowDeserializationSchema in Python
DataStream API in [1]. Please take a look at if that helps for you~  
  
Regards,  
Dian  
  
[1] <https://github.com/apache/flink/blob/release-1.15/flink-
python/pyflink/examples/datastream/formats/avro_format.py>



On Fri, Apr 22, 2022 at 7:24 PM Dian Fu
<[dian0511.fu@gmail.com](mailto:dian0511.fu@gmail.com)> wrote:

> Hi Quynh,  
>  
> Could you show some sample code on how you use it?  
>  
> Regards,  
> Dian
>
>  
>
> On Fri, Apr 22, 2022 at 1:42 PM lan tran
> <[indigoblue7075@gmail.com](mailto:indigoblue7075@gmail.com)> wrote:
>

>> Wonder if this is a bug or not but if I use
**AvroRowDeserializationSchema,**

>>

>> In PyFlink the error still occure ?  
>  
> py4j.protocol.Py4JError: An error occurred while calling
> None.org.apache.flink.formats.avro.AvroRowDeserializationSchema. Trace:
>>

>> org.apache.flink.api.python.shaded.py4j.Py4JException: Constructor
org.apache.flink.formats.avro.AvroRowDeserializationSchema([class
org.apache.avro.Schema$RecordSchema]) does not exist  
>  
> Therefore, please help check. Thanks  
> Best,  
> Quynh
>>

>>  
>>

>>  
>>

>> Sent from [Mail](https://go.microsoft.com/fwlink/?LinkId=550986) for
Windows

>>

>>  
>>

>>  **From:**[lan tran](mailto:indigoblue7075@gmail.com)  
>  **Sent:** Thursday, April 21, 2022 1:43 PM  
>  **To:**[user@flink.apache.org](mailto:user@flink.apache.org)  
>  **Subject:** AvroRowDeserializationSchema
>>

>>  
>>

>> Hi team,  
>  
> I want to implement AvroRowDeserializationSchema when consume data from
> Kafka, however from the documentation, I did not understand what are
> avro_schema_string and record_class ? I would be great if you can give me
> the example on this (I only have the example on Java, however, I was doing
> it using PyFlink ).  
>  
> As my understanding avro_schema_string is schema_registry_url ? Does it
> support this  'debezium-avro-confluent.schema-
> registry.url'='{schema_registry_url}' like in TableAPI ?  
>  
> Best,  
> Quynh.
>>

>> Sent from [Mail](https://go.microsoft.com/fwlink/?LinkId=550986) for
Windows

>>

>>  






Re: AvroRowDeserializationSchema

Posted by Dian Fu <di...@gmail.com>.
Hi Quynh,

I have added an example on how to use AvroRowDeserializationSchema in
Python DataStream API in [1]. Please take a look at if that helps for you~

Regards,
Dian

[1]
https://github.com/apache/flink/blob/release-1.15/flink-python/pyflink/examples/datastream/formats/avro_format.py

On Fri, Apr 22, 2022 at 7:24 PM Dian Fu <di...@gmail.com> wrote:

> Hi Quynh,
>
> Could you show some sample code on how you use it?
>
> Regards,
> Dian
>
> On Fri, Apr 22, 2022 at 1:42 PM lan tran <in...@gmail.com> wrote:
>
>> Wonder if this is a bug or not but if I use
>> *AvroRowDeserializationSchema,*
>>
>> In PyFlink the error still occure ?
>>
>> py4j.protocol.Py4JError: An error occurred while calling
>> None.org.apache.flink.formats.avro.AvroRowDeserializationSchema. Trace:
>>
>> org.apache.flink.api.python.shaded.py4j.Py4JException: Constructor
>> org.apache.flink.formats.avro.AvroRowDeserializationSchema([class
>> org.apache.avro.Schema$RecordSchema]) does not exist
>>
>> Therefore, please help check. Thanks
>> Best,
>> Quynh
>>
>>
>>
>>
>>
>> Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for
>> Windows
>>
>>
>>
>> *From: *lan tran <in...@gmail.com>
>> *Sent: *Thursday, April 21, 2022 1:43 PM
>> *To: *user@flink.apache.org
>> *Subject: *AvroRowDeserializationSchema
>>
>>
>>
>> Hi team,
>>
>> I want to implement AvroRowDeserializationSchema when consume data from
>> Kafka, however from the documentation, I did not understand what are
>> avro_schema_string and record_class ? I would be great if you can give me
>> the example on this (I only have the example on Java, however, I was doing
>> it using PyFlink ).
>>
>> As my understanding avro_schema_string is schema_registry_url ? Does it
>> support this
>>  'debezium-avro-confluent.schema-registry.url'='{schema_registry_url}' like
>> in TableAPI ?
>>
>> Best,
>> Quynh.
>>
>> Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for
>> Windows
>>
>>
>>
>>
>>
>

Re: AvroRowDeserializationSchema

Posted by Dian Fu <di...@gmail.com>.
Hi Quynh,

Could you show some sample code on how you use it?

Regards,
Dian

On Fri, Apr 22, 2022 at 1:42 PM lan tran <in...@gmail.com> wrote:

> Wonder if this is a bug or not but if I use
> *AvroRowDeserializationSchema,*
>
> In PyFlink the error still occure ?
>
> py4j.protocol.Py4JError: An error occurred while calling
> None.org.apache.flink.formats.avro.AvroRowDeserializationSchema. Trace:
>
> org.apache.flink.api.python.shaded.py4j.Py4JException: Constructor
> org.apache.flink.formats.avro.AvroRowDeserializationSchema([class
> org.apache.avro.Schema$RecordSchema]) does not exist
>
> Therefore, please help check. Thanks
> Best,
> Quynh
>
>
>
>
>
> Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for
> Windows
>
>
>
> *From: *lan tran <in...@gmail.com>
> *Sent: *Thursday, April 21, 2022 1:43 PM
> *To: *user@flink.apache.org
> *Subject: *AvroRowDeserializationSchema
>
>
>
> Hi team,
>
> I want to implement AvroRowDeserializationSchema when consume data from
> Kafka, however from the documentation, I did not understand what are
> avro_schema_string and record_class ? I would be great if you can give me
> the example on this (I only have the example on Java, however, I was doing
> it using PyFlink ).
>
> As my understanding avro_schema_string is schema_registry_url ? Does it
> support this
>  'debezium-avro-confluent.schema-registry.url'='{schema_registry_url}' like
> in TableAPI ?
>
> Best,
> Quynh.
>
> Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for
> Windows
>
>
>
>
>

RE: AvroRowDeserializationSchema

Posted by lan tran <in...@gmail.com>.
Wonder if this is a bug or not but if I use **AvroRowDeserializationSchema,**

In PyFlink the error still occure ?  
  
py4j.protocol.Py4JError: An error occurred while calling
None.org.apache.flink.formats.avro.AvroRowDeserializationSchema. Trace:

org.apache.flink.api.python.shaded.py4j.Py4JException: Constructor
org.apache.flink.formats.avro.AvroRowDeserializationSchema([class
org.apache.avro.Schema$RecordSchema]) does not exist  
  
Therefore, please help check. Thanks  
Best,  
Quynh





Sent from [Mail](https://go.microsoft.com/fwlink/?LinkId=550986) for Windows



 **From:**[lan tran](mailto:indigoblue7075@gmail.com)  
 **Sent:** Thursday, April 21, 2022 1:43 PM  
 **To:**[user@flink.apache.org](mailto:user@flink.apache.org)  
 **Subject:** AvroRowDeserializationSchema



Hi team,  
  
I want to implement AvroRowDeserializationSchema when consume data from Kafka,
however from the documentation, I did not understand what are
avro_schema_string and record_class ? I would be great if you can give me the
example on this (I only have the example on Java, however, I was doing it
using PyFlink ).  
  
As my understanding avro_schema_string is schema_registry_url ? Does it
support this  'debezium-avro-confluent.schema-
registry.url'='{schema_registry_url}' like in TableAPI ?  
  
Best,  
Quynh.

Sent from [Mail](https://go.microsoft.com/fwlink/?LinkId=550986) for Windows