You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@apex.apache.org by Vivek Bhide <bh...@gmail.com> on 2017/05/31 01:42:41 UTC

AvroToPojo Operator doesn't recover after failure and keeps throwing Kryo exception

I am using the AvroToPojo Malhar operator in conjunction with
AvroFileInputOperator for converting the avro records to POJO. While doing
the testing for application's stability, I found that AvroToPojo opwerator
doesn't recover in case of failure and keeps throwing below exception. This
in turn makes the whole application unstable and hence to be killed

The field for which it throws error 'ActiveFieldInfo' is a static inner
class and I am not sure on what can be done to have the operator recover
itself without any trouble. 

Any pointers on this issue will be really helpful

2017-05-30 17:15:46,826 INFO  stram.StreamingContainerParent
(StreamingContainerParent.java:log(170)) - child msg: deploy request failed:
[OperatorDeployInfo[id=2,name=fileReader$avroToPojo,type=GENERIC,checkpoint={592dee04000000b3,
0,
0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=data,streamId=fileReader$avrotopojostream,sourceNodeId=1,sourcePortName=output,locality=CONTAINER_LOCAL,partitionMask=0,partitionKeys=<null>]],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=output,streamId=convertAuditRecordToPojo,bufferServer=brdn2244.target.com]]],
OperatorDeployInfo[id=1,name=fileReader$fileReader,type=INPUT,checkpoint={592dee04000000b3,
0,
0},inputs=[],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=output,streamId=fileReader$avrotopojostream,bufferServer=<null>]]]]
com.esotericsoftware.kryo.KryoException: Class cannot be created (missing
no-arg constructor): com.datatorrent.contrib.avro.AvroToPojo$ActiveFieldInfo
Serialization trace:
columnFieldSetters (com.datatorrent.contrib.avro.AvroToPojo)
	at
com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228)
	at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049)
	at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058)
	at
com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547)
	at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523)
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
	at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
	at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
	at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
	at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
	at
com.datatorrent.common.util.FSStorageAgent.retrieve(FSStorageAgent.java:200)
	at com.datatorrent.common.util.FSStorageAgent.load(FSStorageAgent.java:139)
	at
com.datatorrent.stram.engine.StreamingContainer.deployNodes(StreamingContainer.java:935)
	at
com.datatorrent.stram.engine.StreamingContainer.deploy(StreamingContainer.java:883)
	at
com.datatorrent.stram.engine.StreamingContainer.processHeartbeatResponse(StreamingContainer.java:827)
	at
com.datatorrent.stram.engine.StreamingContainer.heartbeatLoop(StreamingContainer.java:708)
	at
com.datatorrent.stram.engine.StreamingContainer.main(StreamingContainer.java:313)
 context:
PTContainer[id=1(container_e21_1491404336779_1770158_01_000018),state=ACTIVE,operators=[PTOperator[id=2,name=fileReader$avroToPojo,state=PENDING_DEPLOY],
PTOperator[id=1,name=fileReader$fileReader,state=PENDING_DEPLOY]]]
2017-05-30 17:15:46,832 INFO  stram.StreamingContainerParent
(StreamingContainerParent.java:log(170)) - child msg:
java.lang.IllegalStateException: Deploy request failed:
[OperatorDeployInfo[id=2,name=fileReader$avroToPojo,type=GENERIC,checkpoint={592dee04000000b3,
0,
0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=data,streamId=fileReader$avrotopojostream,sourceNodeId=1,sourcePortName=output,locality=CONTAINER_LOCAL,partitionMask=0,partitionKeys=<null>]],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=output,streamId=convertAuditRecordToPojo,bufferServer=brdn2244.target.com]]],
OperatorDeployInfo[id=1,name=fileReader$fileReader,type=INPUT,checkpoint={592dee04000000b3,
0,
0},inputs=[],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=output,streamId=fileReader$avrotopojostream,bufferServer=<null>]]]]
	at
com.datatorrent.stram.engine.StreamingContainer.processHeartbeatResponse(StreamingContainer.java:836)
	at
com.datatorrent.stram.engine.StreamingContainer.heartbeatLoop(StreamingContainer.java:708)
	at
com.datatorrent.stram.engine.StreamingContainer.main(StreamingContainer.java:313)
Caused by: com.esotericsoftware.kryo.KryoException: Class cannot be created
(missing no-arg constructor):
com.datatorrent.contrib.avro.AvroToPojo$ActiveFieldInfo
Serialization trace:
columnFieldSetters (com.datatorrent.contrib.avro.AvroToPojo)
	at
com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228)
	at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049)
	at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058)
	at
com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547)
	at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523)
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
	at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
	at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
	at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
	at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
	at
com.datatorrent.common.util.FSStorageAgent.retrieve(FSStorageAgent.java:200)
	at com.datatorrent.common.util.FSStorageAgent.load(FSStorageAgent.java:139)
	at
com.datatorrent.stram.engine.StreamingContainer.deployNodes(StreamingContainer.java:935)
	at
com.datatorrent.stram.engine.StreamingContainer.deploy(StreamingContainer.java:883)
	at
com.datatorrent.stram.engine.StreamingContainer.processHeartbeatResponse(StreamingContainer.java:827)
	... 2 more


Regards
Vivek



--
View this message in context: http://apache-apex-users-list.78494.x6.nabble.com/AvroToPojo-Operator-doesn-t-recover-after-failure-and-keeps-throwing-Kryo-exception-tp1660.html
Sent from the Apache Apex Users list mailing list archive at Nabble.com.

Re: AvroToPojo Operator doesn't recover after failure and keeps throwing Kryo exception

Posted by Vivek Bhide <bh...@gmail.com>.
Hi Sandesh. this worked as expected.

Regards
Vivek



--
View this message in context: http://apache-apex-users-list.78494.x6.nabble.com/AvroToPojo-Operator-doesn-t-recover-after-failure-and-keeps-throwing-Kryo-exception-tp1660p1667.html
Sent from the Apache Apex Users list mailing list archive at Nabble.com.

Re: AvroToPojo Operator doesn't recover after failure and keeps throwing Kryo exception

Posted by Vivek Bhide <bh...@gmail.com>.
Thanks a lot Sandesh.. This problem was bugging for quite some time. I will
try these to see if it resolves the problem



--
View this message in context: http://apache-apex-users-list.78494.x6.nabble.com/AvroToPojo-Operator-doesn-t-recover-after-failure-and-keeps-throwing-Kryo-exception-tp1660p1663.html
Sent from the Apache Apex Users list mailing list archive at Nabble.com.

Re: AvroToPojo Operator doesn't recover after failure and keeps throwing Kryo exception

Posted by Sandesh Hegde <sa...@datatorrent.com>.
AvroToPojo has a bug, transient modifier needs to be added to 2 fields.

private transient List<FieldInfo> fieldInfos;
private transient List<ActiveFieldInfo> columnFieldSetters;

Also there is one more bug in Avro input operator, there is a PR open for
that.
Fix is to add the below line to beginWindow call in the operator.
super.beginWindow(windowId);

In both the cases, you can copy the operator code to your repo,  and make
the changes mentioned.

Thanks



On Tue, May 30, 2017 at 6:42 PM Vivek Bhide <bh...@gmail.com> wrote:

> I am using the AvroToPojo Malhar operator in conjunction with
> AvroFileInputOperator for converting the avro records to POJO. While doing
> the testing for application's stability, I found that AvroToPojo opwerator
> doesn't recover in case of failure and keeps throwing below exception. This
> in turn makes the whole application unstable and hence to be killed
>
> The field for which it throws error 'ActiveFieldInfo' is a static inner
> class and I am not sure on what can be done to have the operator recover
> itself without any trouble.
>
> Any pointers on this issue will be really helpful
>
> 2017-05-30 17:15:46,826 INFO  stram.StreamingContainerParent
> (StreamingContainerParent.java:log(170)) - child msg: deploy request
> failed:
>
> [OperatorDeployInfo[id=2,name=fileReader$avroToPojo,type=GENERIC,checkpoint={592dee04000000b3,
> 0,
>
> 0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=data,streamId=fileReader$avrotopojostream,sourceNodeId=1,sourcePortName=output,locality=CONTAINER_LOCAL,partitionMask=0,partitionKeys=<null>]],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=output,streamId=convertAuditRecordToPojo,bufferServer=
> brdn2244.target.com]]],
>
> OperatorDeployInfo[id=1,name=fileReader$fileReader,type=INPUT,checkpoint={592dee04000000b3,
> 0,
>
> 0},inputs=[],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=output,streamId=fileReader$avrotopojostream,bufferServer=<null>]]]]
> com.esotericsoftware.kryo.KryoException: Class cannot be created (missing
> no-arg constructor):
> com.datatorrent.contrib.avro.AvroToPojo$ActiveFieldInfo
> Serialization trace:
> columnFieldSetters (com.datatorrent.contrib.avro.AvroToPojo)
>         at
>
> com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228)
>         at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049)
>         at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058)
>         at
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547)
>         at
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523)
>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>         at
>
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
>         at
>
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>         at
>
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>         at
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>         at
>
> com.datatorrent.common.util.FSStorageAgent.retrieve(FSStorageAgent.java:200)
>         at
> com.datatorrent.common.util.FSStorageAgent.load(FSStorageAgent.java:139)
>         at
>
> com.datatorrent.stram.engine.StreamingContainer.deployNodes(StreamingContainer.java:935)
>         at
>
> com.datatorrent.stram.engine.StreamingContainer.deploy(StreamingContainer.java:883)
>         at
>
> com.datatorrent.stram.engine.StreamingContainer.processHeartbeatResponse(StreamingContainer.java:827)
>         at
>
> com.datatorrent.stram.engine.StreamingContainer.heartbeatLoop(StreamingContainer.java:708)
>         at
>
> com.datatorrent.stram.engine.StreamingContainer.main(StreamingContainer.java:313)
>  context:
>
> PTContainer[id=1(container_e21_1491404336779_1770158_01_000018),state=ACTIVE,operators=[PTOperator[id=2,name=fileReader$avroToPojo,state=PENDING_DEPLOY],
> PTOperator[id=1,name=fileReader$fileReader,state=PENDING_DEPLOY]]]
> 2017-05-30 17:15:46,832 INFO  stram.StreamingContainerParent
> (StreamingContainerParent.java:log(170)) - child msg:
> java.lang.IllegalStateException: Deploy request failed:
>
> [OperatorDeployInfo[id=2,name=fileReader$avroToPojo,type=GENERIC,checkpoint={592dee04000000b3,
> 0,
>
> 0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=data,streamId=fileReader$avrotopojostream,sourceNodeId=1,sourcePortName=output,locality=CONTAINER_LOCAL,partitionMask=0,partitionKeys=<null>]],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=output,streamId=convertAuditRecordToPojo,bufferServer=
> brdn2244.target.com]]],
>
> OperatorDeployInfo[id=1,name=fileReader$fileReader,type=INPUT,checkpoint={592dee04000000b3,
> 0,
>
> 0},inputs=[],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=output,streamId=fileReader$avrotopojostream,bufferServer=<null>]]]]
>         at
>
> com.datatorrent.stram.engine.StreamingContainer.processHeartbeatResponse(StreamingContainer.java:836)
>         at
>
> com.datatorrent.stram.engine.StreamingContainer.heartbeatLoop(StreamingContainer.java:708)
>         at
>
> com.datatorrent.stram.engine.StreamingContainer.main(StreamingContainer.java:313)
> Caused by: com.esotericsoftware.kryo.KryoException: Class cannot be created
> (missing no-arg constructor):
> com.datatorrent.contrib.avro.AvroToPojo$ActiveFieldInfo
> Serialization trace:
> columnFieldSetters (com.datatorrent.contrib.avro.AvroToPojo)
>         at
>
> com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228)
>         at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049)
>         at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058)
>         at
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547)
>         at
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523)
>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>         at
>
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
>         at
>
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>         at
>
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>         at
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>         at
>
> com.datatorrent.common.util.FSStorageAgent.retrieve(FSStorageAgent.java:200)
>         at
> com.datatorrent.common.util.FSStorageAgent.load(FSStorageAgent.java:139)
>         at
>
> com.datatorrent.stram.engine.StreamingContainer.deployNodes(StreamingContainer.java:935)
>         at
>
> com.datatorrent.stram.engine.StreamingContainer.deploy(StreamingContainer.java:883)
>         at
>
> com.datatorrent.stram.engine.StreamingContainer.processHeartbeatResponse(StreamingContainer.java:827)
>         ... 2 more
>
>
> Regards
> Vivek
>
>
>
> --
> View this message in context:
> http://apache-apex-users-list.78494.x6.nabble.com/AvroToPojo-Operator-doesn-t-recover-after-failure-and-keeps-throwing-Kryo-exception-tp1660.html
> Sent from the Apache Apex Users list mailing list archive at Nabble.com.
>