You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Nam-Luc Tran <na...@euranova.eu> on 2015/06/23 17:27:51 UTC

Error while deserializing event

Hello fellow Flinksters,

I currently work on implementing Stale Synchronous Parallel iterations
from the current bulk iterations. I have replacement classes for
IterationHeadPactTask, IterationSynchronizationTask and corresponding
event handlers to do the job. Among the generated events, I have
ClockTaskEvent that inherits from IterationEventWithAggregators and
adds an Int member. I have implemented the write and read method
accordingly and written serialization tests accordingly, inspired by
EventAggregatorsTest.java. The tests pass and everything runs well on
a local setup.

Now, when run on a cluster, I encounter the following error:

java.io.IOException: io.netty.handler.codec.DecoderException:
java.lang.RuntimeException: Error while deserializing event.
at
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.checkError(RemoteInputChannel.java:264)
at
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.getNextBuffer(RemoteInputChannel.java:117)
at
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:335)
at
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:76)
at
org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
at
org.apache.flink.runtime.iterative.task.SSPClockSinkTask.readHeadEventChannel(SSPClockSinkTask.java:231)
at
org.apache.flink.runtime.iterative.task.SSPClockSinkTask.invoke(SSPClockSinkTask.java:125)
at
org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:221)
at java.lang.Thread.run(Thread.java:745)
Caused by: io.netty.handler.codec.DecoderException:
java.lang.RuntimeException: Error while deserializing event.
at
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:99)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
... 1 more
Caused by: java.lang.RuntimeException: Error while deserializing
event.
at
org.apache.flink.runtime.io.network.api.serialization.EventSerializer.fromSerializedEvent(EventSerializer.java:78)
at
org.apache.flink.runtime.io.network.netty.NettyMessage$TaskEventRequest.readFrom(NettyMessage.java:458)
at
org.apache.flink.runtime.io.network.netty.NettyMessage$NettyMessageDecoder.decode(NettyMessage.java:146)
at
org.apache.flink.runtime.io.network.netty.NettyMessage$NettyMessageDecoder.decode(NettyMessage.java:114)
at
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:89)
... 13 more
Caused by: java.io.EOFException
at
org.apache.flink.runtime.util.DataInputDeserializer.readFully(DataInputDeserializer.java:141)
at
org.apache.flink.runtime.util.DataInputDeserializer.readFully(DataInputDeserializer.java:130)
at
org.apache.flink.runtime.iterative.event.IterationEventWithAggregators.read(IterationEventWithAggregators.java:168)
at
org.apache.flink.runtime.iterative.event.ClockTaskEvent.read(ClockTaskEvent.java:83)
at
org.apache.flink.runtime.io.network.api.serialization.EventSerializer.fromSerializedEvent(EventSerializer.java:73)
... 17 more

What am I missing here? Should I register the new event ClockTaskEvent
to some serializer somewhere? Also, these lines bother me:
at
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.getNextBuffer(RemoteInputChannel.java:117)
at
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:335)

Why is it going through the getNextBuffer method since ClockTaskEvent
is an event and not a buffer?

Thanks and best regards,

Tran Nam-Luc



Re: Error while deserializing event

Posted by Ufuk Celebi <uc...@apache.org>.
Hey Tran Nam-Luc,

You don't have to register with a serializer. Can you share the event code?
I will look into it asap.

The runtime is buffer oriented and events arrive as buffers before they are
deserialized. That's why you see the getNextBuffer call in the stack trace.

– Ufuk

On Tuesday, June 23, 2015, Nam-Luc Tran <na...@euranova.eu> wrote:

> Hello fellow Flinksters,
>
> I currently work on implementing Stale Synchronous Parallel iterations
> from the current bulk iterations. I have replacement classes for
> IterationHeadPactTask, IterationSynchronizationTask and corresponding
> event handlers to do the job. Among the generated events, I have
> ClockTaskEvent that inherits from IterationEventWithAggregators and
> adds an Int member. I have implemented the write and read method
> accordingly and written serialization tests accordingly, inspired by
> EventAggregatorsTest.java. The tests pass and everything runs well on
> a local setup.
>
> Now, when run on a cluster, I encounter the following error:
>
> java.io.IOException: io.netty.handler.codec.DecoderException:
> java.lang.RuntimeException: Error while deserializing event.
> at
>
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.checkError(RemoteInputChannel.java:264)
> at
>
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.getNextBuffer(RemoteInputChannel.java:117)
> at
>
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:335)
> at
>
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:76)
> at
>
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
> at
>
> org.apache.flink.runtime.iterative.task.SSPClockSinkTask.readHeadEventChannel(SSPClockSinkTask.java:231)
> at
>
> org.apache.flink.runtime.iterative.task.SSPClockSinkTask.invoke(SSPClockSinkTask.java:125)
> at
>
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:221)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: io.netty.handler.codec.DecoderException:
> java.lang.RuntimeException: Error while deserializing event.
> at
>
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:99)
> at
>
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
> at
>
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
> at
>
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
> at
>
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
> at
>
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
> at
>
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
> at
>
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
> at
>
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> at
>
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> at
>
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
> ... 1 more
> Caused by: java.lang.RuntimeException: Error while deserializing
> event.
> at
>
> org.apache.flink.runtime.io.network.api.serialization.EventSerializer.fromSerializedEvent(EventSerializer.java:78)
> at
>
> org.apache.flink.runtime.io.network.netty.NettyMessage$TaskEventRequest.readFrom(NettyMessage.java:458)
> at
>
> org.apache.flink.runtime.io.network.netty.NettyMessage$NettyMessageDecoder.decode(NettyMessage.java:146)
> at
>
> org.apache.flink.runtime.io.network.netty.NettyMessage$NettyMessageDecoder.decode(NettyMessage.java:114)
> at
>
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:89)
> ... 13 more
> Caused by: java.io.EOFException
> at
>
> org.apache.flink.runtime.util.DataInputDeserializer.readFully(DataInputDeserializer.java:141)
> at
>
> org.apache.flink.runtime.util.DataInputDeserializer.readFully(DataInputDeserializer.java:130)
> at
>
> org.apache.flink.runtime.iterative.event.IterationEventWithAggregators.read(IterationEventWithAggregators.java:168)
> at
>
> org.apache.flink.runtime.iterative.event.ClockTaskEvent.read(ClockTaskEvent.java:83)
> at
>
> org.apache.flink.runtime.io.network.api.serialization.EventSerializer.fromSerializedEvent(EventSerializer.java:73)
> ... 17 more
>
> What am I missing here? Should I register the new event ClockTaskEvent
> to some serializer somewhere? Also, these lines bother me:
> at
>
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.getNextBuffer(RemoteInputChannel.java:117)
> at
>
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:335)
>
> Why is it going through the getNextBuffer method since ClockTaskEvent
> is an event and not a buffer?
>
> Thanks and best regards,
>
> Tran Nam-Luc
>
>
>