You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by vinay patil <vi...@gmail.com> on 2017/08/25 16:51:46 UTC

Serialization issues with DataStreamUtils

Hi Guys,

I am using DataStreamUtils for unit testing, the test case succeeds when it
is run individually but I get the following error when all the tests are
run:

Serialization trace:
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
	at
org.apache.flink.contrib.streaming.SocketStreamIterator.hasNext(SocketStreamIterator.java:114)

I tried to to register the above classes but it did not work. Also this
error comes randomly for some tests while some test pass.

What could be the issue ?

Regards,
Vinay Patil




--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Serialization-issues-with-DataStreamUtils-tp15139.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Serialization issues with DataStreamUtils

Posted by vinay patil <vi...@gmail.com>.
Hi,

After adding the following two lines the serialization trace does not show
the Schema related classes:

env.getConfig().registerTypeWithKryoSerializer(GenericData.Array.class,
Serializers.SpecificInstanceCollectionSerializerForArrayList.class);
       
env.getConfig().addDefaultKryoSerializer(Schema.class,Serializers.AvroSchemaSerializer.class);

However I still get exception for :
Serialization trace:
schema (org.apache.avro.generic.GenericData$Record)

The default Kyro serializer is not able to serialize GenericData.Record
class. 

Any other way I can get rid off this exception.

P.S I do not see this exception when I run the actual pipeline, this is only
coming in one of our test case



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Serialization issues with DataStreamUtils

Posted by Vinay Patil <vi...@gmail.com>.
Can anyone please help me with this issue

On Aug 31, 2017 5:20 PM, "Vinay Patil" <vi...@gmail.com> wrote:

> Hi,
>
> After adding the following two lines the serialization trace does not show
> the Schema related classes:
>
> env.getConfig().registerTypeWithKryoSerializer(GenericData.Array.class,
> Serializers.SpecificInstanceCollectionSerializerForArrayList.class);
>
> env.getConfig().addDefaultKryoSerializer(Schema.class,Serializers.
> AvroSchemaSerializer.class);
>
> However I still get exception for :
> Serialization trace:
> schema (org.apache.avro.generic.GenericData$Record)
>
> The default Kyro serializer is not able to serialize GenericData.Record
> class.
>
> Any other way I can get rid off this exception.
>
> P.S I do not see this exception when I run the actual pipeline, this is
> only
> coming in one of our test case
>
> Regards,
> Vinay Patil
>
> On Sat, Aug 26, 2017 at 7:47 PM, vinay patil [via Apache Flink User
> Mailing List archive.] <ml...@n4.nabble.com> wrote:
>
>> Hi Robert,
>>
>> The test case code is as follows:
>> GenericRecord testData = new GenericData.Record(avroSchema);
>> SingleOutputStreamOperator<GenericRecord> testStream =
>> env.fromElements(testData)
>>
>>            .map(new DummyOperator(...));
>>
>> Iterator<GenericRecord>iterator = DataStreamUtils.collect(testStream);
>>
>> Here is the complete stack trace:
>>
>> Caused by: com.esotericsoftware.kryo.KryoException: Encountered
>> unregistered class ID: 229
>> Serialization trace:
>> reserved (org.apache.avro.Schema$NullSchema)
>> types (org.apache.avro.Schema$UnionSchema)
>> schema (org.apache.avro.Schema$Field)
>> fieldMap (org.apache.avro.Schema$RecordSchema)
>> schema (org.apache.avro.generic.GenericData$Record)
>>         at com.esotericsoftware.kryo.util.DefaultClassResolver.readClas
>> s(DefaultClassResolver.java:119)
>>         at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>>         at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)
>>
>>         at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>
>>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>
>>         at com.esotericsoftware.kryo.serializers.CollectionSerializer.
>> read(CollectionSerializer.java:116)
>>         at com.esotericsoftware.kryo.serializers.CollectionSerializer.
>> read(CollectionSerializer.java:22)
>>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>         at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>>
>>         at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>
>>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>         at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>>
>>         at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>
>>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>
>>         at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
>>
>>         at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
>>
>>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>         at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>>
>>         at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>
>>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>         at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>>
>>         at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>
>>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>
>>         at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSeriali
>> zer.deserialize(KryoSerializer.java:250)
>>         at org.apache.flink.contrib.streaming.SocketStreamIterator.read
>> NextFromStream(SocketStreamIterator.java:149)
>>         at org.apache.flink.contrib.streaming.SocketStreamIterator.
>> hasNext(SocketStreamIterator.java:112)
>>
>> ------------------------------
>> If you reply to this email, your message will be added to the discussion
>> below:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/Serialization-issues-with-DataStreamUtils-tp15139p15159.html
>> To start a new topic under Apache Flink User Mailing List archive., email
>> ml+s2336050n1h83@n4.nabble.com
>> To unsubscribe from Apache Flink User Mailing List archive., click here
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=dmluYXkxOC5wYXRpbEBnbWFpbC5jb218MXwxODExMDE2NjAx>
>> .
>> NAML
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>
>
>

Re: Serialization issues with DataStreamUtils

Posted by vinay patil <vi...@gmail.com>.
Hi Robert,

The test case code is as follows:
GenericRecord testData = new GenericData.Record(avroSchema);
SingleOutputStreamOperator<GenericRecord> testStream =
env.fromElements(testData)
                                                                                    
.map(new DummyOperator(...));

Iterator<GenericRecord>iterator = DataStreamUtils.collect(testStream);

Here is the complete stack trace:

Caused by: com.esotericsoftware.kryo.KryoException: Encountered unregistered
class ID: 229
Serialization trace:
reserved (org.apache.avro.Schema$NullSchema)
types (org.apache.avro.Schema$UnionSchema)
schema (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
	at
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
	at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
	at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)
	at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
	at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
	at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
	at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
	at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
	at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
	at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
	at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
	at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
	at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
	at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
	at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
	at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
	at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:250)
	at
org.apache.flink.contrib.streaming.SocketStreamIterator.readNextFromStream(SocketStreamIterator.java:149)
	at
org.apache.flink.contrib.streaming.SocketStreamIterator.hasNext(SocketStreamIterator.java:112)



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Serialization-issues-with-DataStreamUtils-tp15139p15159.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Serialization issues with DataStreamUtils

Posted by Robert Metzger <rm...@apache.org>.
Hi Vinay,

could you provide the full stack trace and the Data types you are using in
your streaming application, to fully understand the problem?

Regards,
Robert


On Fri, Aug 25, 2017 at 6:51 PM, vinay patil <vi...@gmail.com>
wrote:

> Hi Guys,
>
> I am using DataStreamUtils for unit testing, the test case succeeds when it
> is run individually but I get the following error when all the tests are
> run:
>
> Serialization trace:
> fieldMap (org.apache.avro.Schema$RecordSchema)
> schema (org.apache.avro.generic.GenericData$Record)
>         at
> org.apache.flink.contrib.streaming.SocketStreamIterator.hasNext(
> SocketStreamIterator.java:114)
>
> I tried to to register the above classes but it did not work. Also this
> error comes randomly for some tests while some test pass.
>
> What could be the issue ?
>
> Regards,
> Vinay Patil
>
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Serialization-
> issues-with-DataStreamUtils-tp15139.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>