You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Rodrigo de Souza Oliveira Brochado <ro...@predito.com.br> on 2020/08/10 23:29:33 UTC

Avro format in pyFlink

Hi guys,

Could you help me setting a Kafka sink connector using Avro format? I'm
using pyFlink 1.11, OpenJDK 11, and executing the job locally (python
my_script.py).

My code is simple with a call to a UDTF that now only yields a fixed small
binary. I use source and sink kafka connectors. The source uses Json format
that work as expected. The sink works with the Csv format:

(t_env.connect( # declare the external system to connect to
Kafka()
.version("universal")
.topic("output")
.property("bootstrap.servers", "localhost:9092")
.property("zookeeper.connect", "localhost:2181")
) \
.with_format( # declare a format for this system
Csv() # Csv convert bytes to base64 string....
) \
.with_schema( # declare the schema of the table
Schema()
.field("outf", DataTypes.BYTES())
) \
.create_temporary_table("mySink"))

But the Csv format converts the bytes (varbinary) to base64 string as
expected, but is not desired.
With the Avro format, I just get errors.

- Just replacing Csv() on the code above for Avro(), and adding Avro's
depency with
conf = t_env.get_config().get_configuration()
conf.set_string("pipeline.jars",
"file://<path_to_kafka_connector>.jar;file://<path_to_kafka_connector>.jar";
)
I've got:
*org.apache.flink.table.api.ValidationException: A definition of an Avro
specific record class or Avro schema is required.*

- After looking in the pyFlink source code, I've also passed an avro_schema
argument to the constructor:  Avro(avro_schema=<my_schema_in_string>) and
got
*java.lang.ClassNotFoundException: org.apache.avro.io.DatumWriter*

- Using the SQL (DDL) declaration documented in [1],  I've got the same
last error.

I also have some doubts about how to create the schema, but I need the Avro
to works first.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/avro.html

Thanks,
Rodrigo

Re: Avro format in pyFlink

Posted by Xingbo Huang <hx...@gmail.com>.
Hi Rodrigo,
For the connectors, Pyflink just wraps the java implementation.
And I am not an expert on Avro and corresponding connectors, but as far as
I know, DataTypes really cannot declare the type of union you mentioned.
Regarding the bytes encoding you mentioned, I actually have no good
suggestions.
I think we need a Avro expert to answer your question.

Best,
Xingbo

rodrigobrochado <ro...@predito.com.br> 于2020年8月14日周五 上午10:07写道:

>
> The upload of the schema through Avro(avro_schema) worked, but I had to
> select one type from the union type to put in Schema.field(field_type)
> inside t_env.connect(). If my dict has long and double values, and I
> declare
> Schema.field(DataTypes.Double()), all the int values are cast to double. My
> maps will also have string values and the job will crash using this
> configuration.
>
> Is there any workaround? If not, I thought of serializing it on the UDTF
> using the python avro lib and sending it as bytes to the sink. The problem
> is that all serialization formats change the original schema: the CSV
> format
> use the base64 encoding for bytes; the JSON format adds a key, to form a
> key/value pair, where the value will the binary; and the Avro format adds 3
> bytes at the beginning of the message.
>
> Thanks,
> Rodrigo
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Re: Avro format in pyFlink

Posted by rodrigobrochado <ro...@predito.com.br>.
The upload of the schema through Avro(avro_schema) worked, but I had to
select one type from the union type to put in Schema.field(field_type)
inside t_env.connect(). If my dict has long and double values, and I declare
Schema.field(DataTypes.Double()), all the int values are cast to double. My
maps will also have string values and the job will crash using this
configuration. 

Is there any workaround? If not, I thought of serializing it on the UDTF
using the python avro lib and sending it as bytes to the sink. The problem
is that all serialization formats change the original schema: the CSV format
use the base64 encoding for bytes; the JSON format adds a key, to form a
key/value pair, where the value will the binary; and the Avro format adds 3
bytes at the beginning of the message.

Thanks,
Rodrigo



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Avro format in pyFlink

Posted by Rodrigo de Souza Oliveira Brochado <ro...@predito.com.br>.
Thank you Xingbo.

I've managed to get it working adding the Avro jar and the three artifacts
from the *com.fasterxml.jackson.core* group [1]. Is it required to also add
the jackson-mapper-asl jar? About joda-time, I suppose that it'll not be
required, as I won't use date types in my Avro schema.

About using Avro, I'd like to know if pyFlink supports the Avro Union Type.
I've found this old e-mail [2] that mentions that, but for java. If pyFlink
supports it, how would I declare the schema? Can I define the schema on an
external .avsc file and import it, maybe through Avro(avro_schema)?

[1] https://search.maven.org/search?q=g:com.fasterxml.jackson.core
[2]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/AVRO-Union-type-support-in-Flink-td14318.html

Thanks,
Rodrigo

On Tue, Aug 11, 2020 at 12:09 AM Xingbo Huang <hx...@gmail.com> wrote:

> Hi Rodrigo,
>
> Flink doesn't support an avro uber jar, so you need to add all dependency
> jars manually, such as avro, jackson-core-asl, jackson-mapper-asl and
> joda-time in release-1.11.
> However, I found that there was a JIRA[1] that provided a default version
> of avro uber jar a few days ago.
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-18802
>
> Best,
> Xingbo
>
> Rodrigo de Souza Oliveira Brochado <ro...@predito.com.br>
> 于2020年8月11日周二 上午7:30写道:
>
>> Hi guys,
>>
>> Could you help me setting a Kafka sink connector using Avro format? I'm
>> using pyFlink 1.11, OpenJDK 11, and executing the job locally (python
>> my_script.py).
>>
>> My code is simple with a call to a UDTF that now only yields a fixed
>> small binary. I use source and sink kafka connectors. The source uses Json
>> format that work as expected. The sink works with the Csv format:
>>
>> (t_env.connect( # declare the external system to connect to
>> Kafka()
>> .version("universal")
>> .topic("output")
>> .property("bootstrap.servers", "localhost:9092")
>> .property("zookeeper.connect", "localhost:2181")
>> ) \
>> .with_format( # declare a format for this system
>> Csv() # Csv convert bytes to base64 string....
>> ) \
>> .with_schema( # declare the schema of the table
>> Schema()
>> .field("outf", DataTypes.BYTES())
>> ) \
>> .create_temporary_table("mySink"))
>>
>> But the Csv format converts the bytes (varbinary) to base64 string as
>> expected, but is not desired.
>> With the Avro format, I just get errors.
>>
>> - Just replacing Csv() on the code above for Avro(), and adding Avro's
>> depency with
>> conf = t_env.get_config().get_configuration()
>> conf.set_string("pipeline.jars",
>> "file://<path_to_kafka_connector>.jar;file://<path_to_kafka_connector>.jar";
>> )
>> I've got:
>> *org.apache.flink.table.api.ValidationException: A definition of an Avro
>> specific record class or Avro schema is required.*
>>
>> - After looking in the pyFlink source code, I've also passed an
>> avro_schema argument to the constructor:
>> Avro(avro_schema=<my_schema_in_string>) and got
>> *java.lang.ClassNotFoundException: org.apache.avro.io.DatumWriter*
>>
>> - Using the SQL (DDL) declaration documented in [1],  I've got the same
>> last error.
>>
>> I also have some doubts about how to create the schema, but I need the
>> Avro to works first.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/avro.html
>>
>> Thanks,
>> Rodrigo
>>
>>

-- 

*Rodrigo S. Oliveira Brochado**Cientista de Dados*
rodrigo.brochado@predito.com.br+55 31 99463-2014 <+553199463-2014>
www.predito.com.br <https://predito.com.br/>

Re: Avro format in pyFlink

Posted by Xingbo Huang <hx...@gmail.com>.
Hi Rodrigo,

Flink doesn't support an avro uber jar, so you need to add all dependency
jars manually, such as avro, jackson-core-asl, jackson-mapper-asl and
joda-time in release-1.11.
However, I found that there was a JIRA[1] that provided a default version
of avro uber jar a few days ago.


[1] https://issues.apache.org/jira/browse/FLINK-18802

Best,
Xingbo

Rodrigo de Souza Oliveira Brochado <ro...@predito.com.br>
于2020年8月11日周二 上午7:30写道:

> Hi guys,
>
> Could you help me setting a Kafka sink connector using Avro format? I'm
> using pyFlink 1.11, OpenJDK 11, and executing the job locally (python
> my_script.py).
>
> My code is simple with a call to a UDTF that now only yields a fixed small
> binary. I use source and sink kafka connectors. The source uses Json format
> that work as expected. The sink works with the Csv format:
>
> (t_env.connect( # declare the external system to connect to
> Kafka()
> .version("universal")
> .topic("output")
> .property("bootstrap.servers", "localhost:9092")
> .property("zookeeper.connect", "localhost:2181")
> ) \
> .with_format( # declare a format for this system
> Csv() # Csv convert bytes to base64 string....
> ) \
> .with_schema( # declare the schema of the table
> Schema()
> .field("outf", DataTypes.BYTES())
> ) \
> .create_temporary_table("mySink"))
>
> But the Csv format converts the bytes (varbinary) to base64 string as
> expected, but is not desired.
> With the Avro format, I just get errors.
>
> - Just replacing Csv() on the code above for Avro(), and adding Avro's
> depency with
> conf = t_env.get_config().get_configuration()
> conf.set_string("pipeline.jars",
> "file://<path_to_kafka_connector>.jar;file://<path_to_kafka_connector>.jar";
> )
> I've got:
> *org.apache.flink.table.api.ValidationException: A definition of an Avro
> specific record class or Avro schema is required.*
>
> - After looking in the pyFlink source code, I've also passed an
> avro_schema argument to the constructor:
> Avro(avro_schema=<my_schema_in_string>) and got
> *java.lang.ClassNotFoundException: org.apache.avro.io.DatumWriter*
>
> - Using the SQL (DDL) declaration documented in [1],  I've got the same
> last error.
>
> I also have some doubts about how to create the schema, but I need the
> Avro to works first.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/avro.html
>
> Thanks,
> Rodrigo
>
>