You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by gu...@foxmail.com on 2022/03/09 07:10:00 UTC

Problem about adding custom kryo serializer

Hi,

 

I have an entity class built by Google Flatbuf, to raise the performance, I
have tried written a serializer class.

 

public class TransactionSerializer extends Serializer<Transaction> {

    @Override

    public void write(Kryo kryo, Output output, Transaction transaction) {

        ByteBuffer byteBuffer = transaction.getByteBuffer();

        byte[] generated = new byte[byteBuffer.remaining()];

        byteBuffer.get(generated);

        output.writeInt(generated.length, true);

        output.writeBytes(generated);

    }

 

    @Override

    public Transaction read(Kryo kryo, Input input, Class<Transaction>
aClass) {

        int size = input.readInt(true);

        byte[] barr = new byte[size];

        input.readBytes(barr);

        ByteBuffer buf = ByteBuffer.wrap(barr);

        return Transaction.getRootAsTransaction(buf);

    }

}

 

And register it to the runtime env before calling env.execute.

 

env.registerTypeWithKryoSerializer(Transaction.class,
TransactionSerializer.class);

env.getConfig().addDefaultKryoSerializer(Transaction.class,
TransactionSerializer.class);

 

 

After that, I executed my job, however, I can see the log like this.

 

2021-11-18 10:35:07,624 INFO
org.apache.flink.api.java.typeutils.TypeExtractor            [] - class
org.ff.dto.flatbuff.Transaction does not contain a setter for field bb_pos

2021-11-18 10:35:07,624 INFO
org.apache.flink.api.java.typeutils.TypeExtractor            [] - Class
class org.ff.dto.flatbuff.Transaction cannot be used as a POJO type because
not all fields are valid POJO fields, and must be processed as GenericType.
Please read the Flink documentation on "Data Types & Serialization" for
details of the effect on performance.

 

It looks like the serializer is not working at all. So what's the problem
about this? I register the serializer in a wrong way? Or do I need to move
the class to somewhere to make the flink classloader recognize it?

 

Thanks in advance.


Re: Problem about adding custom kryo serializer

Posted by Chesnay Schepler <ch...@apache.org>.
Sounds correct to me; it's not a POJO so it is treated as a generic 
type, which go through Kryo.

If you want to be doubly-sure that your serializer is in fact used, add 
a log statement to the read/write methods.

On 09/03/2022 08:10, guoliubin85@foxmail.com wrote:
>
> Hi,
>
> I have an entity class built by Google Flatbuf, to raise the 
> performance, I have tried written a serializer class.
>
> public class TransactionSerializer extends Serializer<Transaction> {
>
>     @Override
>
>     public void write(Kryo kryo, Output output, Transaction transaction) {
>
>         ByteBuffer byteBuffer = transaction.getByteBuffer();
>
>         byte[] generated = new byte[byteBuffer.remaining()];
>
> byteBuffer.get(generated);
>
> output.writeInt(generated.length, true);
>
> output.writeBytes(generated);
>
>     }
>
>     @Override
>
>     public Transaction read(Kryo kryo, Input input, Class<Transaction> 
> aClass) {
>
>         int size = input.readInt(true);
>
>         byte[] barr = new byte[size];
>
> input.readBytes(barr);
>
>         ByteBuffer buf = ByteBuffer.wrap(barr);
>
>         return Transaction.getRootAsTransaction(buf);
>
>     }
>
> }
>
> And register it to the runtime env before calling env.execute.
>
> env.registerTypeWithKryoSerializer(Transaction.class, 
> TransactionSerializer.class);
>
> env.getConfig().addDefaultKryoSerializer(Transaction.class, 
> TransactionSerializer.class);
>
> After that, I executed my job, however, I can see the log like this.
>
> 2021-11-18 10:35:07,624 INFO 
> org.apache.flink.api.java.typeutils.TypeExtractor [] - class 
> org.ff.dto.flatbuff.Transaction does not contain a setter for field bb_pos
>
> 2021-11-18 10:35:07,624 INFO 
> org.apache.flink.api.java.typeutils.TypeExtractor [] - Class class 
> org.ff.dto.flatbuff.Transaction cannot be used as a POJO type because 
> not all fields are valid POJO fields, and must be processed as 
> GenericType. Please read the Flink documentation on "Data Types & 
> Serialization" for details of the effect on performance.
>
> It looks like the serializer is not working at all. So what’s the 
> problem about this? I register the serializer in a wrong way? Or do I 
> need to move the class to somewhere to make the flink classloader 
> recognize it?
>
> Thanks in advance.
>