You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Biplob Biswas <re...@gmail.com> on 2017/08/22 15:42:03 UTC

Expception with Avro Serialization on RocksDBStateBackend

Hi,

I am getting the following exception in my code, I can observe that there's
something wrong while serializing my Object, the class of which looks
something like this:
 
https://gist.github.com/revolutionisme/1eea5ccf5e1d4a5452f27a1fd5c05ff1

The exact cause it seems is some field inside my nested object which is null
(reversalIndicator ), but its not exactly clear why this exception is
thrown, one interesting thing to note is when I serialized with kryo before,
it serialized properly without any issues. Is it some requirement of the
avro serializer or some bug ? or Some problem on my end? 



2017-08-22 17:21:48,892 ERROR
com.airplus.poc.flink.statefulFunctions.UpdateTxnState        - Something
unexpected happened - probably malformed event
java.lang.RuntimeException: Error while adding data to RocksDB
        at
org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:102)
        at
com.airplus.poc.flink.statefulFunctions.UpdateTxnState.processElement(UpdateTxnState.java:98)
        at
com.airplus.poc.flink.statefulFunctions.UpdateTxnState.processElement(UpdateTxnState.java:1)
        at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:94)
        at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
        at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.NullPointerException: in
com.airplus.poc.flink.model.TransactionStateModel in
com.airplus.poc.generated.xjc.RecordReadEventType in
com.airplus.poc.generated.xjc.RawTransactionItemType in string null of
string in field reversalIndicator of
com.airplus.poc.generated.xjc.RawTransactionItemType in field
rawTransactionItem of com.airplus.poc.generated.xjc.RecordReadEventType in
field recordReadEvent of com.airplus.poc.flink.model.TransactionStateModel
        at
org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:145)
        at
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
        at
org.apache.flink.api.java.typeutils.runtime.AvroSerializer.serialize(AvroSerializer.java:135)
        at
org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:99)
        ... 8 more
Caused by: java.lang.NullPointerException



Thanks & Regards,
Biplob



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Expception-with-Avro-Serialization-on-RocksDBStateBackend-tp15067.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Expception with Avro Serialization on RocksDBStateBackend

Posted by Biplob Biswas <re...@gmail.com>.
Hi,

I am still stuck here, and I still couldn't find a way to make Avro accept
null values. 

Any help here would be really appreciated.

Thanks,
Biplob



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Expception with Avro Serialization on RocksDBStateBackend

Posted by Biplob Biswas <re...@gmail.com>.
Hi Till, 

Thanks for the response.  I was assuming that the Avro Serializer will
create a corresponding Avro schema with the Object class I provide. In that
respect, I did the following:

AvroSerializer<TransactionStateModel> txnAvroSerde = new
AvroSerializer<>(TransactionStateModel.class);
    ValueStateDescriptor<TransactionStateModel> stateDescriptor = new
ValueStateDescriptor<>(
            "transaction", txnAvroSerde);

    stateDescriptor.setQueryable("transaction");

    this.txnState = getRuntimeContext().getState(stateDescriptor);


By doing this, I was expecting that the avro serializer would convert my
data into avro format using the corresponding avro schema created with the
information from my class. And correspondingly this data in the avro format
to be stored in the RcoksDB statebackend.

Is my assumption wrong? I am using Flink 1.3.2 along with the
flink-avro_2.10 library for the same flink version which internally has avro
1.7.7

Thanks,
Biplob




--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Expception-with-Avro-Serialization-on-RocksDBStateBackend-tp15067p15103.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Expception with Avro Serialization on RocksDBStateBackend

Posted by Till Rohrmann <tr...@apache.org>.
Hi Biplob,

have you told Avro to allow null for fields in your schema? If yes, then
could you share the Avro schema, the version of Flink as well as the Avro
version with us? This would help with further understanding the problem.

Cheers,
Till

On Tue, Aug 22, 2017 at 5:42 PM, Biplob Biswas <re...@gmail.com>
wrote:

> Hi,
>
> I am getting the following exception in my code, I can observe that there's
> something wrong while serializing my Object, the class of which looks
> something like this:
>
> https://gist.github.com/revolutionisme/1eea5ccf5e1d4a5452f27a1fd5c05ff1
>
> The exact cause it seems is some field inside my nested object which is
> null
> (reversalIndicator ), but its not exactly clear why this exception is
> thrown, one interesting thing to note is when I serialized with kryo
> before,
> it serialized properly without any issues. Is it some requirement of the
> avro serializer or some bug ? or Some problem on my end?
>
>
>
> 2017-08-22 17:21:48,892 ERROR
> com.airplus.poc.flink.statefulFunctions.UpdateTxnState        - Something
> unexpected happened - probably malformed event
> java.lang.RuntimeException: Error while adding data to RocksDB
>         at
> org.apache.flink.contrib.streaming.state.RocksDBValueState.update(
> RocksDBValueState.java:102)
>         at
> com.airplus.poc.flink.statefulFunctions.UpdateTxnState.processElement(
> UpdateTxnState.java:98)
>         at
> com.airplus.poc.flink.statefulFunctions.UpdateTxnState.processElement(
> UpdateTxnState.java:1)
>         at
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.
> processElement(KeyedProcessOperator.java:94)
>         at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(
> StreamInputProcessor.java:206)
>         at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(
> OneInputStreamTask.java:69)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:263)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>         at java.lang.Thread.run(Unknown Source)
> Caused by: java.lang.NullPointerException: in
> com.airplus.poc.flink.model.TransactionStateModel in
> com.airplus.poc.generated.xjc.RecordReadEventType in
> com.airplus.poc.generated.xjc.RawTransactionItemType in string null of
> string in field reversalIndicator of
> com.airplus.poc.generated.xjc.RawTransactionItemType in field
> rawTransactionItem of com.airplus.poc.generated.xjc.RecordReadEventType in
> field recordReadEvent of com.airplus.poc.flink.model.TransactionStateModel
>         at
> org.apache.avro.reflect.ReflectDatumWriter.write(
> ReflectDatumWriter.java:145)
>         at
> org.apache.avro.generic.GenericDatumWriter.write(
> GenericDatumWriter.java:58)
>         at
> org.apache.flink.api.java.typeutils.runtime.AvroSerializer.serialize(
> AvroSerializer.java:135)
>         at
> org.apache.flink.contrib.streaming.state.RocksDBValueState.update(
> RocksDBValueState.java:99)
>         ... 8 more
> Caused by: java.lang.NullPointerException
>
>
>
> Thanks & Regards,
> Biplob
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Expception-with-
> Avro-Serialization-on-RocksDBStateBackend-tp15067.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>