You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Padarn Wilson <pa...@gmail.com> on 2018/05/14 16:53:50 UTC

AvroInputFormat Serialisation Issue

Hi all - sorry this seems like a silly question, but I can't figure it out.

I'm using an AvroInputFormat in order to read an Avro file like this:

val textInputFormat = new AvroInputFormat[GenericRecord](infile,
classOf[GenericRecord])
val lines = env.readFile(textInputFormat, path)

This works fine in local mode, but when submitted to a flink cluster I get
serialisation errors that look like this:

org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
	at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:325)
Caused by: com.esotericsoftware.kryo.KryoException: Error constructing
instance of class: org.apache.avro.Schema$StringSchema
Serialization trace:
schema (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
elementType (org.apache.avro.Schema$ArraySchema)
schema (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
	at com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:126)
	at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523)
	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:189)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:547)
	... 7 more
Caused by: java.lang.IllegalAccessException: Class
com.twitter.chill.Instantiators$$anonfun$normalJava$1 can not access a
member of class org.apache.avro.Schema$StringSchema with modifiers
"public"
	at sun.reflect.Reflection.ensureMemberAccess(Reflection.java:102)
	at java.lang.reflect.AccessibleObject.slowCheckMemberAccess(AccessibleObject.java:296)
	at java.lang.reflect.AccessibleObject.checkAccess(AccessibleObject.java:288)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:413)
	at com.twitter.chill.Instantiators$$anonfun$normalJava$1.apply(KryoBase.scala:160)
	at com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:123)
	... 37 more



I realise this is an issue that is mentioned in the documentation, but
given that it looks like it is a problem with some class insider the
AvroInputFormat that is having trouble being serialised, I'm not sure on
what he best solution is.

This works fine if I specify the class not to be generic - i.e

val textInputFormat = new AvroInputFormat[GenericRecord](infile,
classOf[Example])
val lines = env.readFile(textInputFormat, path

However I can't get this to run in local mode with a case class `Example`
that is nested, which is required as the Avro files have very nested fields.

Re: AvroInputFormat Serialisation Issue

Posted by Vinay Patil <vi...@gmail.com>.
Hi,

Changing the classloader config to parent-first solved the issue.

Regards,
Vinay Patil


On Wed, Nov 7, 2018 at 7:25 AM Vinay Patil <vi...@gmail.com> wrote:

> Hi,
>
> Can someone please help here.
>
> On Nov 6, 2018 10:46 PM, "Vinay Patil [via Apache Flink User Mailing List
> archive.]" <ml...@n4.nabble.com> wrote:
>
>> Hi,
>>
>> I am facing a similar issue today with Flink 1.6.0 - AvroOutputFormat
>>
>> AvroOutputFormat<GenericRecord> tuple2AvroOutputFormat = new
>> AvroOutputFormat<>(
>>                 new Path("<path>"), GenericRecord.class);
>>
>> testDataSet
>>                 .map(new GenerateGenericRecord())
>>                 .returns(AvroTypeInfo.of(GenericRecord.class))
>>                 .output(tuple2AvroOutputFormat);
>>
>> Following is the exception (I have enabled forceAvro config , not sure
>> why
>> it still goes to Kyro Serializer)
>>
>> com.esotericsoftware.kryo.KryoException: Error constructing instance of
>> class: org.apache.avro.Schema$LockableArrayList
>> Serialization trace:
>> types (org.apache.avro.Schema$UnionSchema)
>> schema (org.apache.avro.Schema$Field)
>> fieldMap (org.apache.avro.Schema$RecordSchema)
>> schema (org.apache.avro.generic.GenericData$Record)
>>         at
>> com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:126)
>>         at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061)
>>         at
>> com.esotericsoftware.kryo.serializers.CollectionSerializer.create(CollectionSerializer.java:89)
>>
>>         at
>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:93)
>>
>>         at
>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
>>
>>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>
>> Please let me know if there is a fix for this issue as I have not faced
>> this
>> problem for DataStreams.
>>
>> Regards,
>> Vinay Patil
>>
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>>
>> ------------------------------
>> If you reply to this email, your message will be added to the discussion
>> below:
>>
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/AvroInputFormat-Serialisation-Issue-tp20146p24314.html
>> To start a new topic under Apache Flink User Mailing List archive., email
>> ml+s2336050n1h83@n4.nabble.com
>> To unsubscribe from Apache Flink User Mailing List archive., click here
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=dmluYXkxOC5wYXRpbEBnbWFpbC5jb218MXwxODExMDE2NjAx>
>> .
>> NAML
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>
>

Re: AvroInputFormat Serialisation Issue

Posted by Vinay Patil <vi...@gmail.com>.
Hi,

I am facing a similar issue today with Flink 1.6.0 - AvroOutputFormat

AvroOutputFormat<GenericRecord> tuple2AvroOutputFormat = new
AvroOutputFormat<>(
                new Path("<path>"), GenericRecord.class);

testDataSet
                .map(new GenerateGenericRecord())
                .returns(AvroTypeInfo.of(GenericRecord.class))
                .output(tuple2AvroOutputFormat);

Following is the exception (I have enabled forceAvro config , not sure why
it still goes to Kyro Serializer)

com.esotericsoftware.kryo.KryoException: Error constructing instance of
class: org.apache.avro.Schema$LockableArrayList
Serialization trace:
types (org.apache.avro.Schema$UnionSchema)
schema (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
	at com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:126)
	at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061)
	at
com.esotericsoftware.kryo.serializers.CollectionSerializer.create(CollectionSerializer.java:89)
	at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:93)
	at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)

Please let me know if there is a fix for this issue as I have not faced this
problem for DataStreams.

Regards,
Vinay Patil




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: AvroInputFormat Serialisation Issue

Posted by Timo Walther <tw...@apache.org>.
Flink should not interact poorly with heavily nested schemas. So this 
might be another bug that is worth investigating. Can you share an 
example that reproduces your issues with us? Which Flink version are you 
using?

Contributors are always welcome :) I will also take a look into the 
serialization issue otherwise.

Regards,
Timo

Am 15.05.18 um 17:33 schrieb Padarn Wilson:
> > usually people are using the AvroInputFormat with the Avro class 
> generated by an Avro schema
>
> This is actually what I was doing.. but it seems to interact poorly 
> with heavily nested schemas. My schema has a field which is a 
> `List[SubSchema]`, where SubSchema is another avro schema.
>
> > What do you mean with "this is an issue that is mentioned in the 
> documentation" where is this issue documented?
>
> I was referring to this: 
> https://flink.apache.org/faq.html#i-have-a-notserializableexception
>
> > So your exception seems to be a bug if it works locally but not 
> distributed.
>
> Hmm, well its nice to know I'm not just doing something stupid :-) 
> Perhaps I'll try compile flink myself so I can try and debug this.
>
> On Tue, May 15, 2018 at 8:54 PM Timo Walther <twalthr@apache.org 
> <ma...@apache.org>> wrote:
>
>     Hi Padarn,
>
>     usually people are using the AvroInputFormat with the Avro class
>     generated by an Avro schema. But after looking into the
>     implementation, one should also be able to use the GenericRecord
>     class as a parameter. So your exception seems to be a bug if it
>     works locally but not distributed. What do you mean with "this is
>     an issue that is mentioned in the documentation" where is this
>     issue documented?
>
>     Regards,
>     Timo
>
>
>
>     Am 14.05.18 um 18:53 schrieb Padarn Wilson:
>>     Hi all - sorry this seems like a silly question, but I can't
>>     figure it out.
>>
>>     I'm using an AvroInputFormat in order to read an Avro file like this:
>>     val textInputFormat =new AvroInputFormat[GenericRecord](infile,classOf[GenericRecord])
>>     val lines = env.readFile(textInputFormat, path)
>>     This works fine in local mode, but when submitted to a flink
>>     cluster I get serialisation errors that look like this:
>>
>>     org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
>>     	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)
>>     	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
>>     	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
>>     	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
>>     	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
>>     	at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
>>     	at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
>>     	at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:325)
>>     Caused by: com.esotericsoftware.kryo.KryoException: Error constructing instance of class: org.apache.avro.Schema$StringSchema
>>     Serialization trace:
>>     schema (org.apache.avro.Schema$Field)
>>     fieldMap (org.apache.avro.Schema$RecordSchema)
>>     elementType (org.apache.avro.Schema$ArraySchema)
>>     schema (org.apache.avro.Schema$Field)
>>     fieldMap (org.apache.avro.Schema$RecordSchema)
>>     schema (org.apache.avro.generic.GenericData$Record)
>>     	at com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:126)
>>     	at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061)
>>     	at com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547)
>>     	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523)
>>     	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>     	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>>     	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>     	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>     	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
>>     	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
>>     	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>     	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>>     	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>     	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>     	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>>     	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>     	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>     	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>>     	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>     	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>     	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
>>     	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
>>     	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>     	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>>     	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>     	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>     	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>>     	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>     	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
>>     	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:189)
>>     	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:547)
>>     	... 7 more
>>     Caused by: java.lang.IllegalAccessException: Class com.twitter.chill.Instantiators$$anonfun$normalJava$1 can not access a member of class org.apache.avro.Schema$StringSchema with modifiers "public"
>>     	at sun.reflect.Reflection.ensureMemberAccess(Reflection.java:102)
>>     	at java.lang.reflect.AccessibleObject.slowCheckMemberAccess(AccessibleObject.java:296)
>>     	at java.lang.reflect.AccessibleObject.checkAccess(AccessibleObject.java:288)
>>     	at java.lang.reflect.Constructor.newInstance(Constructor.java:413)
>>     	at com.twitter.chill.Instantiators$$anonfun$normalJava$1.apply(KryoBase.scala:160)
>>     	at com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:123)
>>     	... 37 more
>>
>>
>>     I realise this is an issue that is mentioned in the
>>     documentation, but given that it looks like it is a problem with
>>     some class insider the AvroInputFormat that is having trouble
>>     being serialised, I'm not sure on what he best solution is.
>>
>>     This works fine if I specify the class not to be generic - i.e
>>     val textInputFormat =new AvroInputFormat[GenericRecord](infile,classOf[Example])
>>     val lines = env.readFile(textInputFormat, path
>>     However I can't get this to run in local mode with a case class
>>     `Example` that is nested, which is required as the Avro files
>>     have very nested fields.
>
>


Re: AvroInputFormat Serialisation Issue

Posted by Timo Walther <tw...@apache.org>.
Hi Padarn,

usually people are using the AvroInputFormat with the Avro class 
generated by an Avro schema. But after looking into the implementation, 
one should also be able to use the GenericRecord class as a parameter. 
So your exception seems to be a bug if it works locally but not 
distributed. What do you mean with "this is an issue that is mentioned 
in the documentation" where is this issue documented?

Regards,
Timo



Am 14.05.18 um 18:53 schrieb Padarn Wilson:
> Hi all - sorry this seems like a silly question, but I can't figure it 
> out.
>
> I'm using an AvroInputFormat in order to read an Avro file like this:
> val textInputFormat =new AvroInputFormat[GenericRecord](infile,classOf[GenericRecord])
> val lines = env.readFile(textInputFormat, path)
> This works fine in local mode, but when submitted to a flink cluster I 
> get serialisation errors that look like this:
>
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
> 	at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
> 	at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
> 	at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:325)
> Caused by: com.esotericsoftware.kryo.KryoException: Error constructing instance of class: org.apache.avro.Schema$StringSchema
> Serialization trace:
> schema (org.apache.avro.Schema$Field)
> fieldMap (org.apache.avro.Schema$RecordSchema)
> elementType (org.apache.avro.Schema$ArraySchema)
> schema (org.apache.avro.Schema$Field)
> fieldMap (org.apache.avro.Schema$RecordSchema)
> schema (org.apache.avro.generic.GenericData$Record)
> 	at com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:126)
> 	at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061)
> 	at com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547)
> 	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523)
> 	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
> 	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
> 	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> 	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> 	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
> 	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
> 	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
> 	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
> 	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> 	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
> 	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
> 	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> 	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
> 	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
> 	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> 	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> 	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
> 	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
> 	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
> 	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
> 	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> 	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
> 	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
> 	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> 	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
> 	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:189)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:547)
> 	... 7 more
> Caused by: java.lang.IllegalAccessException: Class com.twitter.chill.Instantiators$$anonfun$normalJava$1 can not access a member of class org.apache.avro.Schema$StringSchema with modifiers "public"
> 	at sun.reflect.Reflection.ensureMemberAccess(Reflection.java:102)
> 	at java.lang.reflect.AccessibleObject.slowCheckMemberAccess(AccessibleObject.java:296)
> 	at java.lang.reflect.AccessibleObject.checkAccess(AccessibleObject.java:288)
> 	at java.lang.reflect.Constructor.newInstance(Constructor.java:413)
> 	at com.twitter.chill.Instantiators$$anonfun$normalJava$1.apply(KryoBase.scala:160)
> 	at com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:123)
> 	... 37 more
>
>
> I realise this is an issue that is mentioned in the documentation, but 
> given that it looks like it is a problem with some class insider the 
> AvroInputFormat that is having trouble being serialised, I'm not sure 
> on what he best solution is.
>
> This works fine if I specify the class not to be generic - i.e
> val textInputFormat =new AvroInputFormat[GenericRecord](infile,classOf[Example])
> val lines = env.readFile(textInputFormat, path
> However I can't get this to run in local mode with a case class 
> `Example` that is nested, which is required as the Avro files have 
> very nested fields.