You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Ajit Dongre <Aj...@walmartlabs.com> on 2020/12/01 05:06:44 UTC

NotSerializableException in Spark runner

Hi all,

I have pipeline to read data from kafka & write to file.  I am using Beam 2.12 with spark runner in java.  While executing I am getting below exception :

org.apache.spark.network.client.ChunkFetchFailureException: Failure while fetching StreamChunkId{streamId=1660417857014, chunkIndex=0}: java.io.NotSerializableException: org.apache.beam.sdk.util.WindowedValue$TimestampedValueInGlobalWindow
Serialization stack:
        - object not serializable (class: org.apache.beam.sdk.util.WindowedValue$TimestampedValueInGlobalWindow, value: TimestampedValueInGlobalWindow{value=com.walmart.dataplatform.aorta.river.meta.Payload@727f8c85, timestamp=2020-10-07T09:02:16.340Z, pane=PaneInfo{isFirst=true, timing=EARLY, index=0}})
        - field (class: scala.Tuple2, name: _2, type: class java.lang.Object)
        - object (class scala.Tuple2, (Tag<com.walmart.dataplatform.aorta.river.fn.FilterPCollection.<init>:49#bb20b45fd4d95138>,TimestampedValueInGlobalWindow{value=com.walmart.dataplatform.aorta.river.meta.Payload@727f8c85, timestamp=2020-10-07T09:02:16.340Z, pane=PaneInfo{isFirst=true, timing=EARLY, index=0}}))
        at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
        at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
        at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:140)
        at org.apache.spark.serializer.SerializerManager.dataSerializeWithExplicitClassTag(SerializerManager.scala:193)
        at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doGetLocalBytes(BlockManager.scala:608)
        at org.apache.spark.storage.BlockManager$$anonfun$getLocalBytes$2.apply(BlockManager.scala:583)
        at org.apache.spark.storage.BlockManager$$anonfun$getLocalBytes$2.apply(BlockManager.scala:583)
        at scala.Option.map(Option.scala:146)
        at org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:583)
        at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:377)
        at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$1.apply(NettyBlockRpcServer.scala:61)
        at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$1.apply(NettyBlockRpcServer.scala:60)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
        at scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:31)
        at org.apache.spark.network.server.OneForOneStreamManager.getChunk(OneForOneStreamManager.java:92)
        at org.apache.spark.network.server.TransportRequestHandler.processFetchRequest(TransportRequestHandler.java:137)
        at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:109)
        at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
        at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
        at java.lang.Thread.run(Thread.java:748)        at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:182)
        at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:120)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
        at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
        at java.lang.Thread.run(Thread.java:748)


com.walmart.dataplatform.aorta.river.meta.Payload@727f8c85<ma...@727f8c85> is serializable and also I observed that this exception is not coming for all records.

Please suggest solution to this problem.

Regards,
Ajit Dongre

Re: NotSerializableException in Spark runner

Posted by Alexey Romanenko <ar...@gmail.com>.
Could you make sure that the instance of com.walmart.dataplatform.aorta.river.meta.Payload, created from a failed record, is serializable?

> On 1 Dec 2020, at 06:06, Ajit Dongre <Aj...@walmartlabs.com> wrote:
> 
> Hi all,
>  
> I have pipeline to read data from kafka & write to file.  I am using Beam 2.12 with spark runner in java.  While executing I am getting below exception :
>  
> org.apache.spark.network.client.ChunkFetchFailureException: Failure while fetching StreamChunkId{streamId=1660417857014, chunkIndex=0}: java.io.NotSerializableException: org.apache.beam.sdk.util.WindowedValue$TimestampedValueInGlobalWindow
> Serialization stack:
>         - object not serializable (class: org.apache.beam.sdk.util.WindowedValue$TimestampedValueInGlobalWindow, value: TimestampedValueInGlobalWindow{value=com.walmart.dataplatform.aorta.river.meta.Payload@727f8c85, timestamp=2020-10-07T09:02:16.340Z, pane=PaneInfo{isFirst=true, timing=EARLY, index=0}})
>         - field (class: scala.Tuple2, name: _2, type: class java.lang.Object)
>         - object (class scala.Tuple2, (Tag<com.walmart.dataplatform.aorta.river.fn.FilterPCollection.<init>:49#bb20b45fd4d95138>,TimestampedValueInGlobalWindow{value=com.walmart.dataplatform.aorta.river.meta.Payload@727f8c85, timestamp=2020-10-07T09:02:16.340Z, pane=PaneInfo{isFirst=true, timing=EARLY, index=0}}))
>         at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
>         at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
>         at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:140)
>         at org.apache.spark.serializer.SerializerManager.dataSerializeWithExplicitClassTag(SerializerManager.scala:193)
>         at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doGetLocalBytes(BlockManager.scala:608)
>         at org.apache.spark.storage.BlockManager$$anonfun$getLocalBytes$2.apply(BlockManager.scala:583)
>         at org.apache.spark.storage.BlockManager$$anonfun$getLocalBytes$2.apply(BlockManager.scala:583)
>         at scala.Option.map(Option.scala:146)
>         at org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:583)
>         at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:377)
>         at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$1.apply(NettyBlockRpcServer.scala:61)
>         at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$1.apply(NettyBlockRpcServer.scala:60)
>         at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
>         at scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:31)
>         at org.apache.spark.network.server.OneForOneStreamManager.getChunk(OneForOneStreamManager.java:92)
>         at org.apache.spark.network.server.TransportRequestHandler.processFetchRequest(TransportRequestHandler.java:137)
>         at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:109)
>         at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118)
>         at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
>         at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
>         at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
>         at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
>         at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
>         at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
>         at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
>         at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
>         at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
>         at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
>         at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
>         at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
>         at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
>         at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
>         at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
>         at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
>         at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
>         at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
>         at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
>         at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
>         at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
>         at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
>         at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
>         at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
>         at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
>         at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
>         at java.lang.Thread.run(Thread.java:748)        at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:182)
>         at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:120)
>         at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
>         at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
>         at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
>         at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
>         at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
>         at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
>         at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
>         at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
>         at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
>         at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
>         at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
>         at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
>         at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
>         at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
>         at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
>         at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
>         at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
>         at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
>         at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
>         at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
>         at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
>         at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
>         at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
>         at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
>         at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
>         at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
>         at java.lang.Thread.run(Thread.java:748)
>  
>  
> com.walmart.dataplatform.aorta.river.meta.Payload@727f8c85 <ma...@727f8c85> is serializable and also I observed that this exception is not coming for all records.
>  
> Please suggest solution to this problem.
>  
> Regards,
> Ajit Dongre