You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Imran Rashid (JIRA)" <ji...@apache.org> on 2017/09/19 13:41:01 UTC

[jira] [Commented] (SPARK-21928) ML LogisticRegression training occasionally produces java.lang.ClassNotFoundException when attempting to load custom Kryo registrator class

    [ https://issues.apache.org/jira/browse/SPARK-21928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16171715#comment-16171715 ] 

Imran Rashid commented on SPARK-21928:
--------------------------------------

Hi [~jbrock],

thanks for reporting this.  I have another bug report which I think is similar, I don't think this actually has anything to do with ML, it can happen anytime cached RDDs get sent over the network (perhaps that is a bit more likely to happen with ML workloads).

I do have one question for you, though -- in the other bug report I have, the user says that when they hit this error, the executor gets stuck, which eventually leads to the application failing.  However, I haven't been able to reproduce that so far -- there are some errors, but from what I see, the executor just gives up fetching the remote data, and regenerates it locally.  What have you observed when this happens?

more details:

From SPARK-13990 & SPARK-13926, Spark's SerializerManager has [its own instance of a KryoSerializer|https://github.com/apache/spark/blob/581200af717bcefd11c9930ac063fe53c6fd2fde/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala#L42] which does not have the defaultClassLoader set on it. For normal task execution, that doesn't cause problems, because the serializer falls back to the current thread's task loader, which is set anyway.

however, netty maintains its own thread pool, and those threads don't change their classloader to include the extra use jars needed for the custom kryo registrator. That only matters when cached RDDs are sent across the network. That won't happen often, because spark tries to execute tasks where the RDDs are already cached.  (You'll notice your stack trace includes netty stuff underneath.)

This doesn't effect the shuffle path, because the serde is never done in the threads created by netty.

I think a fix for this should be fairly straight-forward, we just need to set the classloader on that extra kryo instance.

> ML LogisticRegression training occasionally produces java.lang.ClassNotFoundException when attempting to load custom Kryo registrator class
> -------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-21928
>                 URL: https://issues.apache.org/jira/browse/SPARK-21928
>             Project: Spark
>          Issue Type: Bug
>          Components: ML
>    Affects Versions: 2.2.0
>            Reporter: John Brock
>
> I unfortunately can't reliably reproduce this bug; it happens only occasionally, when training a logistic regression model with very large datasets. The training will often proceed through several {{treeAggregate}} calls without any problems, and then suddenly workers will start running into this {{java.lang.ClassNotFoundException}}.
> After doing some debugging, it seems that whenever this error happens, Spark is trying to use the {{sun.misc.Launcher$AppClassLoader}} {{ClassLoader}} instance instead of the usual {{org.apache.spark.util.MutableURLClassLoader}}. {{MutableURLClassLoader}} can see my custom Kryo registrator, but the {{AppClassLoader}} instance can't.
> When this error does pop up, it's usually accompanied by the task seeming to hang, and I need to kill Spark manually.
> I'm running a Spark application in cluster mode via spark-submit, and I have a custom Kryo registrator. The JAR is built with {{sbt assembly}}.
> Exception message:
> {noformat}
> 17/08/29 22:39:04 ERROR TransportRequestHandler: Error opening block StreamChunkId{streamId=542074019336, chunkIndex=0} for request from /10.0.29.65:34332
> org.apache.spark.SparkException: Failed to register classes with Kryo
>     at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:139)
>     at org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:292)
>     at org.apache.spark.serializer.KryoSerializerInstance.<init>(KryoSerializer.scala:277)
>     at org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:186)
>     at org.apache.spark.serializer.SerializerManager.dataSerializeStream(SerializerManager.scala:169)
>     at org.apache.spark.storage.BlockManager$$anonfun$dropFromMemory$3.apply(BlockManager.scala:1382)
>     at org.apache.spark.storage.BlockManager$$anonfun$dropFromMemory$3.apply(BlockManager.scala:1377)
>     at org.apache.spark.storage.DiskStore.put(DiskStore.scala:69)
>     at org.apache.spark.storage.BlockManager.dropFromMemory(BlockManager.scala:1377)
>     at org.apache.spark.storage.memory.MemoryStore.org$apache$spark$storage$memory$MemoryStore$$dropBlock$1(MemoryStore.scala:524)
>     at org.apache.spark.storage.memory.MemoryStore$$anonfun$evictBlocksToFreeSpace$2.apply(MemoryStore.scala:545)
>     at org.apache.spark.storage.memory.MemoryStore$$anonfun$evictBlocksToFreeSpace$2.apply(MemoryStore.scala:539)
>     at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>     at org.apache.spark.storage.memory.MemoryStore.evictBlocksToFreeSpace(MemoryStore.scala:539)
>     at org.apache.spark.memory.StorageMemoryPool.acquireMemory(StorageMemoryPool.scala:92)
>     at org.apache.spark.memory.StorageMemoryPool.acquireMemory(StorageMemoryPool.scala:73)
>     at org.apache.spark.memory.StaticMemoryManager.acquireStorageMemory(StaticMemoryManager.scala:72)
>     at org.apache.spark.storage.memory.MemoryStore.putBytes(MemoryStore.scala:147)
>     at org.apache.spark.storage.BlockManager.maybeCacheDiskBytesInMemory(BlockManager.scala:1143)
>     at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doGetLocalBytes(BlockManager.scala:594)
>     at org.apache.spark.storage.BlockManager$$anonfun$getLocalBytes$2.apply(BlockManager.scala:559)
>     at org.apache.spark.storage.BlockManager$$anonfun$getLocalBytes$2.apply(BlockManager.scala:559)
>     at scala.Option.map(Option.scala:146)
>     at org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:559)
>     at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:353)
>     at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$1.apply(NettyBlockRpcServer.scala:61)
>     at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$1.apply(NettyBlockRpcServer.scala:60)
>     at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>     at scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:31)
>     at org.apache.spark.network.server.OneForOneStreamManager.getChunk(OneForOneStreamManager.java:89)
>     at org.apache.spark.network.server.TransportRequestHandler.processFetchRequest(TransportRequestHandler.java:125)
>     at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:103)
>     at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118)
>     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
>     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
>     at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
>     at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287)
>     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
>     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
>     at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
>     at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
>     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
>     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
>     at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
>     at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
>     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
>     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
>     at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
>     at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
>     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
>     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
>     at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
>     at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
>     at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643)
>     at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
>     at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
>     at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
>     at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
>     at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
>     at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassNotFoundException: com.foo.bar.MyKryoRegistrator
>     at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>     at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>     at java.lang.Class.forName0(Native Method)
>     at java.lang.Class.forName(Class.java:348)
>     at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$5.apply(KryoSerializer.scala:134)
>     at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$5.apply(KryoSerializer.scala:134)
>     at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>     at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>     at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
>     at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:134)
>     ... 60 more
> {noformat}
> My Spark session is created like so:
> {code:java}
> val spark = SparkSession.builder()
>                 .appName("FooBar")
>                 .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
>                 .config("spark.kryoserializer.buffer.max", "2047m")                                        
>                 .config("spark.kryo.registrator","com.foo.bar.MyKryoRegistrator")
>                 .config("spark.kryo.registrationRequired", "true")
>                 .config("spark.network.timeout", "3600s")
>                 .config("spark.driver.maxResultSize", "0")
>                 .config("spark.rdd.compress", "true")
>                 .config("spark.shuffle.spill", "true")
>                 .getOrCreate()
> {code}
> Here are the config options I'm passing to spark-submit:
> {noformat}
> --conf "spark.executor.heartbeatInterval=400s"
> --conf "spark.speculation=true"
> --conf "spark.speculation.multiplier=30"
> --conf "spark.speculation.quantile=0.95"
> --conf "spark.memory.useLegacyMode=true"
> --conf "spark.shuffle.memoryFraction=0.8"
> --conf "spark.storage.memoryFraction=0.2"
> --driver-java-options "-XX:+UseG1GC"
> {noformat}
> I was able to find a workaround: copy your application JAR to each of the machines in your cluster, and pass the JAR's path to {{spark-submit}} with:
> {noformat}
> --conf "spark.driver.extraClassPath=/path/to/sparklogisticregre‌​ssion.jar"
> --conf "spark.executor.extraClassPath=/path/to/sparklogisticreg‌​ression.jar"
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org