You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by gu...@foxmail.com on 2022/03/09 07:10:00 UTC
Problem about adding custom kryo serializer
Hi,
I have an entity class built by Google Flatbuf, to raise the performance, I
have tried written a serializer class.
public class TransactionSerializer extends Serializer<Transaction> {
@Override
public void write(Kryo kryo, Output output, Transaction transaction) {
ByteBuffer byteBuffer = transaction.getByteBuffer();
byte[] generated = new byte[byteBuffer.remaining()];
byteBuffer.get(generated);
output.writeInt(generated.length, true);
output.writeBytes(generated);
}
@Override
public Transaction read(Kryo kryo, Input input, Class<Transaction>
aClass) {
int size = input.readInt(true);
byte[] barr = new byte[size];
input.readBytes(barr);
ByteBuffer buf = ByteBuffer.wrap(barr);
return Transaction.getRootAsTransaction(buf);
}
}
And register it to the runtime env before calling env.execute.
env.registerTypeWithKryoSerializer(Transaction.class,
TransactionSerializer.class);
env.getConfig().addDefaultKryoSerializer(Transaction.class,
TransactionSerializer.class);
After that, I executed my job, however, I can see the log like this.
2021-11-18 10:35:07,624 INFO
org.apache.flink.api.java.typeutils.TypeExtractor [] - class
org.ff.dto.flatbuff.Transaction does not contain a setter for field bb_pos
2021-11-18 10:35:07,624 INFO
org.apache.flink.api.java.typeutils.TypeExtractor [] - Class
class org.ff.dto.flatbuff.Transaction cannot be used as a POJO type because
not all fields are valid POJO fields, and must be processed as GenericType.
Please read the Flink documentation on "Data Types & Serialization" for
details of the effect on performance.
It looks like the serializer is not working at all. So what's the problem
about this? I register the serializer in a wrong way? Or do I need to move
the class to somewhere to make the flink classloader recognize it?
Thanks in advance.
Re: Problem about adding custom kryo serializer
Posted by Chesnay Schepler <ch...@apache.org>.
Sounds correct to me; it's not a POJO so it is treated as a generic
type, which go through Kryo.
If you want to be doubly-sure that your serializer is in fact used, add
a log statement to the read/write methods.
On 09/03/2022 08:10, guoliubin85@foxmail.com wrote:
>
> Hi,
>
> I have an entity class built by Google Flatbuf, to raise the
> performance, I have tried written a serializer class.
>
> public class TransactionSerializer extends Serializer<Transaction> {
>
> @Override
>
> public void write(Kryo kryo, Output output, Transaction transaction) {
>
> ByteBuffer byteBuffer = transaction.getByteBuffer();
>
> byte[] generated = new byte[byteBuffer.remaining()];
>
> byteBuffer.get(generated);
>
> output.writeInt(generated.length, true);
>
> output.writeBytes(generated);
>
> }
>
> @Override
>
> public Transaction read(Kryo kryo, Input input, Class<Transaction>
> aClass) {
>
> int size = input.readInt(true);
>
> byte[] barr = new byte[size];
>
> input.readBytes(barr);
>
> ByteBuffer buf = ByteBuffer.wrap(barr);
>
> return Transaction.getRootAsTransaction(buf);
>
> }
>
> }
>
> And register it to the runtime env before calling env.execute.
>
> env.registerTypeWithKryoSerializer(Transaction.class,
> TransactionSerializer.class);
>
> env.getConfig().addDefaultKryoSerializer(Transaction.class,
> TransactionSerializer.class);
>
> After that, I executed my job, however, I can see the log like this.
>
> 2021-11-18 10:35:07,624 INFO
> org.apache.flink.api.java.typeutils.TypeExtractor [] - class
> org.ff.dto.flatbuff.Transaction does not contain a setter for field bb_pos
>
> 2021-11-18 10:35:07,624 INFO
> org.apache.flink.api.java.typeutils.TypeExtractor [] - Class class
> org.ff.dto.flatbuff.Transaction cannot be used as a POJO type because
> not all fields are valid POJO fields, and must be processed as
> GenericType. Please read the Flink documentation on "Data Types &
> Serialization" for details of the effect on performance.
>
> It looks like the serializer is not working at all. So what’s the
> problem about this? I register the serializer in a wrong way? Or do I
> need to move the class to somewhere to make the flink classloader
> recognize it?
>
> Thanks in advance.
>