You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Stephan Ewen <se...@apache.org> on 2015/07/21 15:04:49 UTC

Re: Error while deserializing event

Hi!

Actually, the lib folder should cover it - it contains all code, and that
is what is relevant there.

A common cause of such problems is version mismatches, meaning one node
runs some code, another node other code. That sometimes happens when shared
folders are not fully synced. Not sure if it happened here.

Greetings,
Stephan


On Wed, Jun 24, 2015 at 10:54 AM, Nam-Luc Tran <na...@euranova.eu>
wrote:

> Thanks for reply Ufuk.
>
> After a full redeployment, meaning copying all the build-target folder
> instead of just updating the build-target/lib folder on the workers,
> the problem disappeared.
>
> I have concluded from experience that any change involving
> serialization of objects has implications beyond the big jar file.
> Could you confirm?
>
> Best regards,
>
> Tran Nam-Luc
>
> At Tuesday, 23/06/2015 on 23:27 Ufuk Celebi wrote:
>
> Hey Tran Nam-Luc,
>
> You don't have to register with a serializer. Can you share the event
> code?
> I will look into it asap.
>
> The runtime is buffer oriented and events arrive as buffers before
> they are
> deserialized. That's why you see the getNextBuffer call in the stack
> trace.
>
> – Ufuk
>
> On Tuesday, June 23, 2015, Nam-Luc Tran  wrote:
>
> > Hello fellow Flinksters,
> >
> > I currently work on implementing Stale Synchronous Parallel
> iterations
> > from the current bulk iterations. I have replacement classes for
> > IterationHeadPactTask, IterationSynchronizationTask and
> corresponding
> > event handlers to do the job. Among the generated events, I have
> > ClockTaskEvent that inherits from IterationEventWithAggregators and
> > adds an Int member. I have implemented the write and read method
> > accordingly and written serialization tests accordingly, inspired by
> > EventAggregatorsTest.java. The tests pass and everything runs well
> on
> > a local setup.
> >
> > Now, when run on a cluster, I encounter the following error:
> >
> > java.io.IOException: io.netty.handler.codec.DecoderException:
> > java.lang.RuntimeException: Error while deserializing event.
> > at
> >
> >
>
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.checkError(RemoteInputChannel.java:264)
> > at
> >
> >
>
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.getNextBuffer(RemoteInputChannel.java:117)
> > at
> >
> >
>
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:335)
> > at
> >
> >
>
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:76)
> > at
> >
> >
>
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
> > at
> >
> >
>
> org.apache.flink.runtime.iterative.task.SSPClockSinkTask.readHeadEventChannel(SSPClockSinkTask.java:231)
> > at
> >
> >
>
> org.apache.flink.runtime.iterative.task.SSPClockSinkTask.invoke(SSPClockSinkTask.java:125)
> > at
> >
> >
>
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:221)
> > at java.lang.Thread.run(Thread.java:745)
> > Caused by: io.netty.handler.codec.DecoderException:
> > java.lang.RuntimeException: Error while deserializing event.
> > at
> >
> >
>
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:99)
> > at
> >
> >
>
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
> > at
> >
> >
>
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
> > at
> >
> >
>
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
> > at
> >
> >
>
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
> > at
> >
> >
>
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
> > at
> >
> >
>
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
> > at
> >
> >
>
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
> > at
> >
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
> > at
> >
> >
>
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> > at
> >
> >
>
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> > at
> >
> >
>
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
> > ... 1 more
> > Caused by: java.lang.RuntimeException: Error while deserializing
> > event.
> > at
> >
> >
>
> org.apache.flink.runtime.io.network.api.serialization.EventSerializer.fromSerializedEvent(EventSerializer.java:78)
> > at
> >
> >
>
> org.apache.flink.runtime.io.network.netty.NettyMessage$TaskEventRequest.readFrom(NettyMessage.java:458)
> > at
> >
> >
>
> org.apache.flink.runtime.io.network.netty.NettyMessage$NettyMessageDecoder.decode(NettyMessage.java:146)
> > at
> >
> >
>
> org.apache.flink.runtime.io.network.netty.NettyMessage$NettyMessageDecoder.decode(NettyMessage.java:114)
> > at
> >
> >
>
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:89)
> > ... 13 more
> > Caused by: java.io.EOFException
> > at
> >
> >
>
> org.apache.flink.runtime.util.DataInputDeserializer.readFully(DataInputDeserializer.java:141)
> > at
> >
> >
>
> org.apache.flink.runtime.util.DataInputDeserializer.readFully(DataInputDeserializer.java:130)
> > at
> >
> >
>
> org.apache.flink.runtime.iterative.event.IterationEventWithAggregators.read(IterationEventWithAggregators.java:168)
> > at
> >
> >
>
> org.apache.flink.runtime.iterative.event.ClockTaskEvent.read(ClockTaskEvent.java:83)
> > at
> >
> >
>
> org.apache.flink.runtime.io.network.api.serialization.EventSerializer.fromSerializedEvent(EventSerializer.java:73)
> > ... 17 more
> >
> > What am I missing here? Should I register the new event
> ClockTaskEvent
> > to some serializer somewhere? Also, these lines bother me:
> > at
> >
> >
>
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.getNextBuffer(RemoteInputChannel.java:117)
> > at
> >
> >
>
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:335)
> >
> > Why is it going through the getNextBuffer method since
> ClockTaskEvent
> > is an event and not a buffer?
> >
> > Thanks and best regards,
> >
> > Tran Nam-Luc
> >
> >
> >
>
>
>