You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Jerry Vinokurov <gr...@gmail.com> on 2019/07/10 16:50:35 UTC

intermittent Kryo serialization failures in Spark

Hi all,

I am experiencing a strange intermittent failure of my Spark job that
results from serialization issues in Kryo. Here is the stack trace:

Caused by: java.lang.ClassNotFoundException: com.mycompany.models.MyModel
> 	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> 	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> 	at java.lang.Class.forName0(Native Method)
> 	at java.lang.Class.forName(Class.java:348)
> 	at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132)
> 	at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132)
> 	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> 	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
> 	at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:132)
> 	... 204 more
>
> (I've edited the company and model name since this is proprietary code)

This error does not surface every time the job is run; I would say it
probably shows up once in every 10 runs or so, and there isn't anything
about the input data that triggers this, as I've been able to
(nondeterministically) reproduce the error by simply rerunning the job with
the same inputs over and over again. The model itself is just a plain Scala
case class whose fields are strings and integers, so there's no custom
serialization logic or anything like that. As I understand, this is seems
related to an issue previously documented here
<https://issues.apache.org/jira/browse/SPARK-21928>but allegedly this was
fixed long ago. I'm running this job on an AWS EMR cluster and have
confirmed that the version of Spark running there is 2.4.0, with the patch
that is linked in the above issue being part of the code.

A suggested solution has been to set the extraClasspath config settings on
the driver and executor, but that has not fixed the problem. I'm out of
ideas for how to tackle this and would love to hear if anyone has any
suggestions or strategies for fixing this.

thanks,
Jerry

-- 
http://www.google.com/profiles/grapesmoker

Re: intermittent Kryo serialization failures in Spark

Posted by Jerry Vinokurov <gr...@gmail.com>.
Hi Julien,

Thanks for the suggestion. If we don't do a broadcast, that would
presumably affect the performance of the job, as the model that is failing
to be broadcast is something that we need to be shared across the cluster.
But it may be worth it if the trade-off is not having things run properly.
Vadim's suggestions did not make a difference for me (still hitting this
error several times a day) but I'll try with disabling broadcast and see if
that does anything.

thanks,
Jerry

On Fri, Sep 20, 2019 at 10:00 AM Julien Laurenceau <
julien.laurenceau@pepitedata.com> wrote:

> Hi,
> Did you try without the broadcast ?
> Regards
> JL
>
> Le jeu. 19 sept. 2019 à 06:41, Vadim Semenov <va...@datadoghq.com.invalid>
> a écrit :
>
>> Pre-register your classes:
>>
>> ```
>> import com.esotericsoftware.kryo.Kryo
>> import org.apache.spark.serializer.KryoRegistrator
>>
>> class MyKryoRegistrator extends KryoRegistrator {
>>   override def registerClasses(kryo: Kryo): Unit = {
>>     kryo.register(Class.forName("[[B")) // byte[][]
>>     kryo.register(classOf[java.lang.Class[_]])
>>   }
>> }
>> ```
>>
>> then run with
>>
>> 'spark.kryo.referenceTracking': 'false',
>> 'spark.kryo.registrationRequired': 'false',
>> 'spark.kryo.registrator': 'com.datadog.spark.MyKryoRegistrator',
>> 'spark.kryo.unsafe': 'false',
>> 'spark.kryoserializer.buffer.max': '256m',
>>
>> On Tue, Sep 17, 2019 at 10:38 AM Jerry Vinokurov <gr...@gmail.com>
>> wrote:
>>
>>> Hi folks,
>>>
>>> Posted this some time ago but the problem continues to bedevil us. I'm
>>> including a (slightly edited) stack trace that results from this error. If
>>> anyone can shed any light on what exactly is happening here and what we can
>>> do to avoid it, that would be much appreciated.
>>>
>>> org.apache.spark.SparkException: Failed to register classes with Kryo
>>>> 	at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:140)
>>>> 	at org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:324)
>>>> 	at org.apache.spark.serializer.KryoSerializerInstance.<init>(KryoSerializer.scala:309)
>>>> 	at org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:218)
>>>> 	at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:288)
>>>> 	at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:127)
>>>> 	at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:88)
>>>> 	at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
>>>> 	at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
>>>> 	at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1489)
>>>> 	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.buildReader(CSVFileFormat.scala:103)
>>>> 	at org.apache.spark.sql.execution.datasources.FileFormat$class.buildReaderWithPartitionValues(FileFormat.scala:129)
>>>> 	at org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:165)
>>>> 	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:309)
>>>> 	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:305)
>>>> 	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:327)
>>>> 	at org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:121)
>>>> 	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
>>>> 	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>>> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>>> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>>> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>>> 	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92)
>>>> 	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
>>>> 	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
>>>> 	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
>>>> 	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>>> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>>> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>>> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>>> 	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
>>>> 	at org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:121)
>>>> 	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>>> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>>> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>>> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>>> 	at org.apache.spark.sql.execution.window.WindowExec.doExecute(WindowExec.scala:302)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>>> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>>> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>>> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>>> 	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
>>>> 	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
>>>> 	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>>> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>>> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>>> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>>> 	at org.apache.spark.sql.execution.columnar.CachedRDDBuilder.buildBuffers(InMemoryRelation.scala:83)
>>>> 	at org.apache.spark.sql.execution.columnar.CachedRDDBuilder.cachedColumnBuffers(InMemoryRelation.scala:59)
>>>> 	at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.filteredCachedBatches(InMemoryTableScanExec.scala:276)
>>>> 	at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.inputRDD$lzycompute(InMemoryTableScanExec.scala:105)
>>>> 	at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.inputRDD(InMemoryTableScanExec.scala:104)
>>>> 	at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.doExecute(InMemoryTableScanExec.scala:310)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>>> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>>> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>>> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>>> 	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
>>>> 	at org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:121)
>>>> 	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
>>>> 	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>>> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>>> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>>> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>>> 	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:143)
>>>> 	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
>>>> 	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
>>>> 	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
>>>> 	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>>> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>>> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>>> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>>> 	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
>>>> 	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
>>>> 	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
>>>> 	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
>>>> 	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
>>>> 	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
>>>> 	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
>>>> 	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
>>>> 	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
>>>> 	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
>>>> 	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
>>>> 	at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:664)
>>>>         [our code that writes data to CSV]	
>>>> Caused by: java.lang.ClassNotFoundException: com.mycompany.models.MyModel
>>>> 	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>>>> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>> 	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
>>>> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>> 	at java.lang.Class.forName0(Native Method)
>>>> 	at java.lang.Class.forName(Class.java:348)
>>>> 	at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132)
>>>> 	at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132)
>>>> 	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>>> 	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>>>> 	at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:132)
>>>> 	... 132 more
>>>>
>>>>
>>> On Wed, Jul 10, 2019 at 12:50 PM Jerry Vinokurov <gr...@gmail.com>
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I am experiencing a strange intermittent failure of my Spark job that
>>>> results from serialization issues in Kryo. Here is the stack trace:
>>>>
>>>> Caused by: java.lang.ClassNotFoundException: com.mycompany.models.MyModel
>>>>> 	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>>>>> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>>> 	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
>>>>> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>>> 	at java.lang.Class.forName0(Native Method)
>>>>> 	at java.lang.Class.forName(Class.java:348)
>>>>> 	at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132)
>>>>> 	at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132)
>>>>> 	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>>>> 	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>>>>> 	at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:132)
>>>>> 	... 204 more
>>>>>
>>>>> (I've edited the company and model name since this is proprietary code)
>>>>
>>>> This error does not surface every time the job is run; I would say it
>>>> probably shows up once in every 10 runs or so, and there isn't anything
>>>> about the input data that triggers this, as I've been able to
>>>> (nondeterministically) reproduce the error by simply rerunning the job with
>>>> the same inputs over and over again. The model itself is just a plain Scala
>>>> case class whose fields are strings and integers, so there's no custom
>>>> serialization logic or anything like that. As I understand, this is seems
>>>> related to an issue previously documented here
>>>> <https://issues.apache.org/jira/browse/SPARK-21928>but allegedly this
>>>> was fixed long ago. I'm running this job on an AWS EMR cluster and have
>>>> confirmed that the version of Spark running there is 2.4.0, with the patch
>>>> that is linked in the above issue being part of the code.
>>>>
>>>> A suggested solution has been to set the extraClasspath config settings
>>>> on the driver and executor, but that has not fixed the problem. I'm out of
>>>> ideas for how to tackle this and would love to hear if anyone has any
>>>> suggestions or strategies for fixing this.
>>>>
>>>> thanks,
>>>> Jerry
>>>>
>>>> --
>>>> http://www.google.com/profiles/grapesmoker
>>>>
>>>
>>>
>>> --
>>> http://www.google.com/profiles/grapesmoker
>>>
>>
>>
>> --
>> Sent from my iPhone
>>
>

-- 
http://www.google.com/profiles/grapesmoker

Re: intermittent Kryo serialization failures in Spark

Posted by Julien Laurenceau <ju...@pepitedata.com>.
Hi,
Did you try without the broadcast ?
Regards
JL

Le jeu. 19 sept. 2019 à 06:41, Vadim Semenov <va...@datadoghq.com.invalid>
a écrit :

> Pre-register your classes:
>
> ```
> import com.esotericsoftware.kryo.Kryo
> import org.apache.spark.serializer.KryoRegistrator
>
> class MyKryoRegistrator extends KryoRegistrator {
>   override def registerClasses(kryo: Kryo): Unit = {
>     kryo.register(Class.forName("[[B")) // byte[][]
>     kryo.register(classOf[java.lang.Class[_]])
>   }
> }
> ```
>
> then run with
>
> 'spark.kryo.referenceTracking': 'false',
> 'spark.kryo.registrationRequired': 'false',
> 'spark.kryo.registrator': 'com.datadog.spark.MyKryoRegistrator',
> 'spark.kryo.unsafe': 'false',
> 'spark.kryoserializer.buffer.max': '256m',
>
> On Tue, Sep 17, 2019 at 10:38 AM Jerry Vinokurov <gr...@gmail.com>
> wrote:
>
>> Hi folks,
>>
>> Posted this some time ago but the problem continues to bedevil us. I'm
>> including a (slightly edited) stack trace that results from this error. If
>> anyone can shed any light on what exactly is happening here and what we can
>> do to avoid it, that would be much appreciated.
>>
>> org.apache.spark.SparkException: Failed to register classes with Kryo
>>> 	at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:140)
>>> 	at org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:324)
>>> 	at org.apache.spark.serializer.KryoSerializerInstance.<init>(KryoSerializer.scala:309)
>>> 	at org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:218)
>>> 	at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:288)
>>> 	at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:127)
>>> 	at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:88)
>>> 	at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
>>> 	at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
>>> 	at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1489)
>>> 	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.buildReader(CSVFileFormat.scala:103)
>>> 	at org.apache.spark.sql.execution.datasources.FileFormat$class.buildReaderWithPartitionValues(FileFormat.scala:129)
>>> 	at org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:165)
>>> 	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:309)
>>> 	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:305)
>>> 	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:327)
>>> 	at org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:121)
>>> 	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
>>> 	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>> 	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92)
>>> 	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
>>> 	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
>>> 	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
>>> 	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>> 	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
>>> 	at org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:121)
>>> 	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>> 	at org.apache.spark.sql.execution.window.WindowExec.doExecute(WindowExec.scala:302)
>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>> 	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
>>> 	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
>>> 	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>> 	at org.apache.spark.sql.execution.columnar.CachedRDDBuilder.buildBuffers(InMemoryRelation.scala:83)
>>> 	at org.apache.spark.sql.execution.columnar.CachedRDDBuilder.cachedColumnBuffers(InMemoryRelation.scala:59)
>>> 	at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.filteredCachedBatches(InMemoryTableScanExec.scala:276)
>>> 	at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.inputRDD$lzycompute(InMemoryTableScanExec.scala:105)
>>> 	at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.inputRDD(InMemoryTableScanExec.scala:104)
>>> 	at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.doExecute(InMemoryTableScanExec.scala:310)
>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>> 	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
>>> 	at org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:121)
>>> 	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
>>> 	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>> 	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:143)
>>> 	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
>>> 	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
>>> 	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
>>> 	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>> 	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
>>> 	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
>>> 	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
>>> 	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
>>> 	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
>>> 	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
>>> 	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
>>> 	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
>>> 	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
>>> 	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
>>> 	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
>>> 	at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:664)
>>>         [our code that writes data to CSV]	
>>> Caused by: java.lang.ClassNotFoundException: com.mycompany.models.MyModel
>>> 	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>>> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>> 	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
>>> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>> 	at java.lang.Class.forName0(Native Method)
>>> 	at java.lang.Class.forName(Class.java:348)
>>> 	at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132)
>>> 	at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132)
>>> 	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>> 	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>>> 	at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:132)
>>> 	... 132 more
>>>
>>>
>> On Wed, Jul 10, 2019 at 12:50 PM Jerry Vinokurov <gr...@gmail.com>
>> wrote:
>>
>>> Hi all,
>>>
>>> I am experiencing a strange intermittent failure of my Spark job that
>>> results from serialization issues in Kryo. Here is the stack trace:
>>>
>>> Caused by: java.lang.ClassNotFoundException: com.mycompany.models.MyModel
>>>> 	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>>>> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>> 	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
>>>> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>> 	at java.lang.Class.forName0(Native Method)
>>>> 	at java.lang.Class.forName(Class.java:348)
>>>> 	at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132)
>>>> 	at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132)
>>>> 	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>>> 	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>>>> 	at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:132)
>>>> 	... 204 more
>>>>
>>>> (I've edited the company and model name since this is proprietary code)
>>>
>>> This error does not surface every time the job is run; I would say it
>>> probably shows up once in every 10 runs or so, and there isn't anything
>>> about the input data that triggers this, as I've been able to
>>> (nondeterministically) reproduce the error by simply rerunning the job with
>>> the same inputs over and over again. The model itself is just a plain Scala
>>> case class whose fields are strings and integers, so there's no custom
>>> serialization logic or anything like that. As I understand, this is seems
>>> related to an issue previously documented here
>>> <https://issues.apache.org/jira/browse/SPARK-21928>but allegedly this
>>> was fixed long ago. I'm running this job on an AWS EMR cluster and have
>>> confirmed that the version of Spark running there is 2.4.0, with the patch
>>> that is linked in the above issue being part of the code.
>>>
>>> A suggested solution has been to set the extraClasspath config settings
>>> on the driver and executor, but that has not fixed the problem. I'm out of
>>> ideas for how to tackle this and would love to hear if anyone has any
>>> suggestions or strategies for fixing this.
>>>
>>> thanks,
>>> Jerry
>>>
>>> --
>>> http://www.google.com/profiles/grapesmoker
>>>
>>
>>
>> --
>> http://www.google.com/profiles/grapesmoker
>>
>
>
> --
> Sent from my iPhone
>

Re: intermittent Kryo serialization failures in Spark

Posted by Vadim Semenov <va...@datadoghq.com.INVALID>.
I remember it not working for us when we were setting it from the inside
and needed to actually pass it

On Wed, Sep 18, 2019 at 10:38 AM Jerry Vinokurov <gr...@gmail.com>
wrote:

> Hi Vadim,
>
> Thanks for your suggestion. We do preregister the classes, like so:
>
> object KryoRegistrar {
>>
>>   val classesToRegister: Array[Class[_]] = Array(
>>     classOf[MyModel],
>>    [etc]
>> ) }
>>
>
> And then we do:
>
> val sparkConf = new SparkConf()
>>       .registerKryoClasses(KryoRegistrar.classesToRegister)
>>
>
>  I notice that this is a bit different from your code and I'm wondering
> whether there's any functional difference or if these are two ways to get
> to the same end. Our code is directly adapted from the Spark documentation
> on how to use the Kryo serializer but maybe there's some subtlety here that
> I'm missing.
>
> With regard to the settings, it looks like we currently have the default
> settings, which is to say that referenceTracking is true,
> registrationRequired is false, unsafe is false, and buffer.max is 64m (none
> of our objects are anywhere near that size but... who knows). I will try it
> with your suggestions and see if it solves the problem.
>
> thanks,
> Jerry
>
> On Tue, Sep 17, 2019 at 4:34 PM Vadim Semenov <va...@datadoghq.com> wrote:
>
>> Pre-register your classes:
>>
>> ```
>> import com.esotericsoftware.kryo.Kryo
>> import org.apache.spark.serializer.KryoRegistrator
>>
>> class MyKryoRegistrator extends KryoRegistrator {
>>   override def registerClasses(kryo: Kryo): Unit = {
>>     kryo.register(Class.forName("[[B")) // byte[][]
>>     kryo.register(classOf[java.lang.Class[_]])
>>   }
>> }
>> ```
>>
>> then run with
>>
>> 'spark.kryo.referenceTracking': 'false',
>> 'spark.kryo.registrationRequired': 'false',
>> 'spark.kryo.registrator': 'com.datadog.spark.MyKryoRegistrator',
>> 'spark.kryo.unsafe': 'false',
>> 'spark.kryoserializer.buffer.max': '256m',
>>
>> On Tue, Sep 17, 2019 at 10:38 AM Jerry Vinokurov <gr...@gmail.com>
>> wrote:
>>
>>> Hi folks,
>>>
>>> Posted this some time ago but the problem continues to bedevil us. I'm
>>> including a (slightly edited) stack trace that results from this error. If
>>> anyone can shed any light on what exactly is happening here and what we can
>>> do to avoid it, that would be much appreciated.
>>>
>>> org.apache.spark.SparkException: Failed to register classes with Kryo
>>>> 	at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:140)
>>>> 	at org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:324)
>>>> 	at org.apache.spark.serializer.KryoSerializerInstance.<init>(KryoSerializer.scala:309)
>>>> 	at org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:218)
>>>> 	at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:288)
>>>> 	at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:127)
>>>> 	at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:88)
>>>> 	at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
>>>> 	at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
>>>> 	at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1489)
>>>> 	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.buildReader(CSVFileFormat.scala:103)
>>>> 	at org.apache.spark.sql.execution.datasources.FileFormat$class.buildReaderWithPartitionValues(FileFormat.scala:129)
>>>> 	at org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:165)
>>>> 	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:309)
>>>> 	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:305)
>>>> 	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:327)
>>>> 	at org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:121)
>>>> 	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
>>>> 	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>>> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>>> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>>> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>>> 	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92)
>>>> 	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
>>>> 	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
>>>> 	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
>>>> 	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>>> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>>> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>>> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>>> 	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
>>>> 	at org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:121)
>>>> 	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>>> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>>> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>>> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>>> 	at org.apache.spark.sql.execution.window.WindowExec.doExecute(WindowExec.scala:302)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>>> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>>> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>>> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>>> 	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
>>>> 	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
>>>> 	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>>> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>>> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>>> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>>> 	at org.apache.spark.sql.execution.columnar.CachedRDDBuilder.buildBuffers(InMemoryRelation.scala:83)
>>>> 	at org.apache.spark.sql.execution.columnar.CachedRDDBuilder.cachedColumnBuffers(InMemoryRelation.scala:59)
>>>> 	at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.filteredCachedBatches(InMemoryTableScanExec.scala:276)
>>>> 	at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.inputRDD$lzycompute(InMemoryTableScanExec.scala:105)
>>>> 	at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.inputRDD(InMemoryTableScanExec.scala:104)
>>>> 	at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.doExecute(InMemoryTableScanExec.scala:310)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>>> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>>> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>>> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>>> 	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
>>>> 	at org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:121)
>>>> 	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
>>>> 	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>>> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>>> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>>> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>>> 	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:143)
>>>> 	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
>>>> 	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
>>>> 	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
>>>> 	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>>> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>>> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>>> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>>> 	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
>>>> 	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
>>>> 	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
>>>> 	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
>>>> 	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
>>>> 	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
>>>> 	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
>>>> 	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
>>>> 	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
>>>> 	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
>>>> 	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
>>>> 	at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:664)
>>>>         [our code that writes data to CSV]	
>>>> Caused by: java.lang.ClassNotFoundException: com.mycompany.models.MyModel
>>>> 	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>>>> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>> 	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
>>>> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>> 	at java.lang.Class.forName0(Native Method)
>>>> 	at java.lang.Class.forName(Class.java:348)
>>>> 	at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132)
>>>> 	at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132)
>>>> 	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>>> 	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>>>> 	at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:132)
>>>> 	... 132 more
>>>>
>>>>
>>> On Wed, Jul 10, 2019 at 12:50 PM Jerry Vinokurov <gr...@gmail.com>
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I am experiencing a strange intermittent failure of my Spark job that
>>>> results from serialization issues in Kryo. Here is the stack trace:
>>>>
>>>> Caused by: java.lang.ClassNotFoundException: com.mycompany.models.MyModel
>>>>> 	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>>>>> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>>> 	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
>>>>> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>>> 	at java.lang.Class.forName0(Native Method)
>>>>> 	at java.lang.Class.forName(Class.java:348)
>>>>> 	at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132)
>>>>> 	at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132)
>>>>> 	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>>>> 	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>>>>> 	at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:132)
>>>>> 	... 204 more
>>>>>
>>>>> (I've edited the company and model name since this is proprietary code)
>>>>
>>>> This error does not surface every time the job is run; I would say it
>>>> probably shows up once in every 10 runs or so, and there isn't anything
>>>> about the input data that triggers this, as I've been able to
>>>> (nondeterministically) reproduce the error by simply rerunning the job with
>>>> the same inputs over and over again. The model itself is just a plain Scala
>>>> case class whose fields are strings and integers, so there's no custom
>>>> serialization logic or anything like that. As I understand, this is seems
>>>> related to an issue previously documented here
>>>> <https://issues.apache.org/jira/browse/SPARK-21928>but allegedly this
>>>> was fixed long ago. I'm running this job on an AWS EMR cluster and have
>>>> confirmed that the version of Spark running there is 2.4.0, with the patch
>>>> that is linked in the above issue being part of the code.
>>>>
>>>> A suggested solution has been to set the extraClasspath config settings
>>>> on the driver and executor, but that has not fixed the problem. I'm out of
>>>> ideas for how to tackle this and would love to hear if anyone has any
>>>> suggestions or strategies for fixing this.
>>>>
>>>> thanks,
>>>> Jerry
>>>>
>>>> --
>>>> http://www.google.com/profiles/grapesmoker
>>>>
>>>
>>>
>>> --
>>> http://www.google.com/profiles/grapesmoker
>>>
>>
>>
>> --
>> Sent from my iPhone
>>
>
>
> --
> http://www.google.com/profiles/grapesmoker
>


-- 
Sent from my iPhone

Re: intermittent Kryo serialization failures in Spark

Posted by Jerry Vinokurov <gr...@gmail.com>.
Hi Vadim,

Thanks for your suggestion. We do preregister the classes, like so:

object KryoRegistrar {
>
>   val classesToRegister: Array[Class[_]] = Array(
>     classOf[MyModel],
>    [etc]
> ) }
>

And then we do:

val sparkConf = new SparkConf()
>       .registerKryoClasses(KryoRegistrar.classesToRegister)
>

 I notice that this is a bit different from your code and I'm wondering
whether there's any functional difference or if these are two ways to get
to the same end. Our code is directly adapted from the Spark documentation
on how to use the Kryo serializer but maybe there's some subtlety here that
I'm missing.

With regard to the settings, it looks like we currently have the default
settings, which is to say that referenceTracking is true,
registrationRequired is false, unsafe is false, and buffer.max is 64m (none
of our objects are anywhere near that size but... who knows). I will try it
with your suggestions and see if it solves the problem.

thanks,
Jerry

On Tue, Sep 17, 2019 at 4:34 PM Vadim Semenov <va...@datadoghq.com> wrote:

> Pre-register your classes:
>
> ```
> import com.esotericsoftware.kryo.Kryo
> import org.apache.spark.serializer.KryoRegistrator
>
> class MyKryoRegistrator extends KryoRegistrator {
>   override def registerClasses(kryo: Kryo): Unit = {
>     kryo.register(Class.forName("[[B")) // byte[][]
>     kryo.register(classOf[java.lang.Class[_]])
>   }
> }
> ```
>
> then run with
>
> 'spark.kryo.referenceTracking': 'false',
> 'spark.kryo.registrationRequired': 'false',
> 'spark.kryo.registrator': 'com.datadog.spark.MyKryoRegistrator',
> 'spark.kryo.unsafe': 'false',
> 'spark.kryoserializer.buffer.max': '256m',
>
> On Tue, Sep 17, 2019 at 10:38 AM Jerry Vinokurov <gr...@gmail.com>
> wrote:
>
>> Hi folks,
>>
>> Posted this some time ago but the problem continues to bedevil us. I'm
>> including a (slightly edited) stack trace that results from this error. If
>> anyone can shed any light on what exactly is happening here and what we can
>> do to avoid it, that would be much appreciated.
>>
>> org.apache.spark.SparkException: Failed to register classes with Kryo
>>> 	at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:140)
>>> 	at org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:324)
>>> 	at org.apache.spark.serializer.KryoSerializerInstance.<init>(KryoSerializer.scala:309)
>>> 	at org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:218)
>>> 	at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:288)
>>> 	at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:127)
>>> 	at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:88)
>>> 	at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
>>> 	at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
>>> 	at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1489)
>>> 	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.buildReader(CSVFileFormat.scala:103)
>>> 	at org.apache.spark.sql.execution.datasources.FileFormat$class.buildReaderWithPartitionValues(FileFormat.scala:129)
>>> 	at org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:165)
>>> 	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:309)
>>> 	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:305)
>>> 	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:327)
>>> 	at org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:121)
>>> 	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
>>> 	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>> 	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92)
>>> 	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
>>> 	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
>>> 	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
>>> 	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>> 	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
>>> 	at org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:121)
>>> 	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>> 	at org.apache.spark.sql.execution.window.WindowExec.doExecute(WindowExec.scala:302)
>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>> 	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
>>> 	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
>>> 	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>> 	at org.apache.spark.sql.execution.columnar.CachedRDDBuilder.buildBuffers(InMemoryRelation.scala:83)
>>> 	at org.apache.spark.sql.execution.columnar.CachedRDDBuilder.cachedColumnBuffers(InMemoryRelation.scala:59)
>>> 	at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.filteredCachedBatches(InMemoryTableScanExec.scala:276)
>>> 	at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.inputRDD$lzycompute(InMemoryTableScanExec.scala:105)
>>> 	at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.inputRDD(InMemoryTableScanExec.scala:104)
>>> 	at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.doExecute(InMemoryTableScanExec.scala:310)
>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>> 	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
>>> 	at org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:121)
>>> 	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
>>> 	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>> 	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:143)
>>> 	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
>>> 	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
>>> 	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
>>> 	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>> 	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
>>> 	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
>>> 	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
>>> 	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
>>> 	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
>>> 	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
>>> 	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
>>> 	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
>>> 	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
>>> 	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
>>> 	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
>>> 	at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:664)
>>>         [our code that writes data to CSV]	
>>> Caused by: java.lang.ClassNotFoundException: com.mycompany.models.MyModel
>>> 	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>>> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>> 	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
>>> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>> 	at java.lang.Class.forName0(Native Method)
>>> 	at java.lang.Class.forName(Class.java:348)
>>> 	at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132)
>>> 	at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132)
>>> 	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>> 	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>>> 	at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:132)
>>> 	... 132 more
>>>
>>>
>> On Wed, Jul 10, 2019 at 12:50 PM Jerry Vinokurov <gr...@gmail.com>
>> wrote:
>>
>>> Hi all,
>>>
>>> I am experiencing a strange intermittent failure of my Spark job that
>>> results from serialization issues in Kryo. Here is the stack trace:
>>>
>>> Caused by: java.lang.ClassNotFoundException: com.mycompany.models.MyModel
>>>> 	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>>>> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>> 	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
>>>> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>> 	at java.lang.Class.forName0(Native Method)
>>>> 	at java.lang.Class.forName(Class.java:348)
>>>> 	at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132)
>>>> 	at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132)
>>>> 	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>>> 	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>>>> 	at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:132)
>>>> 	... 204 more
>>>>
>>>> (I've edited the company and model name since this is proprietary code)
>>>
>>> This error does not surface every time the job is run; I would say it
>>> probably shows up once in every 10 runs or so, and there isn't anything
>>> about the input data that triggers this, as I've been able to
>>> (nondeterministically) reproduce the error by simply rerunning the job with
>>> the same inputs over and over again. The model itself is just a plain Scala
>>> case class whose fields are strings and integers, so there's no custom
>>> serialization logic or anything like that. As I understand, this is seems
>>> related to an issue previously documented here
>>> <https://issues.apache.org/jira/browse/SPARK-21928>but allegedly this
>>> was fixed long ago. I'm running this job on an AWS EMR cluster and have
>>> confirmed that the version of Spark running there is 2.4.0, with the patch
>>> that is linked in the above issue being part of the code.
>>>
>>> A suggested solution has been to set the extraClasspath config settings
>>> on the driver and executor, but that has not fixed the problem. I'm out of
>>> ideas for how to tackle this and would love to hear if anyone has any
>>> suggestions or strategies for fixing this.
>>>
>>> thanks,
>>> Jerry
>>>
>>> --
>>> http://www.google.com/profiles/grapesmoker
>>>
>>
>>
>> --
>> http://www.google.com/profiles/grapesmoker
>>
>
>
> --
> Sent from my iPhone
>


-- 
http://www.google.com/profiles/grapesmoker

Re: intermittent Kryo serialization failures in Spark

Posted by Vadim Semenov <va...@datadoghq.com.INVALID>.
Pre-register your classes:

```
import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.KryoRegistrator

class MyKryoRegistrator extends KryoRegistrator {
  override def registerClasses(kryo: Kryo): Unit = {
    kryo.register(Class.forName("[[B")) // byte[][]
    kryo.register(classOf[java.lang.Class[_]])
  }
}
```

then run with

'spark.kryo.referenceTracking': 'false',
'spark.kryo.registrationRequired': 'false',
'spark.kryo.registrator': 'com.datadog.spark.MyKryoRegistrator',
'spark.kryo.unsafe': 'false',
'spark.kryoserializer.buffer.max': '256m',

On Tue, Sep 17, 2019 at 10:38 AM Jerry Vinokurov <gr...@gmail.com>
wrote:

> Hi folks,
>
> Posted this some time ago but the problem continues to bedevil us. I'm
> including a (slightly edited) stack trace that results from this error. If
> anyone can shed any light on what exactly is happening here and what we can
> do to avoid it, that would be much appreciated.
>
> org.apache.spark.SparkException: Failed to register classes with Kryo
>> 	at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:140)
>> 	at org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:324)
>> 	at org.apache.spark.serializer.KryoSerializerInstance.<init>(KryoSerializer.scala:309)
>> 	at org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:218)
>> 	at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:288)
>> 	at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:127)
>> 	at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:88)
>> 	at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
>> 	at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
>> 	at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1489)
>> 	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.buildReader(CSVFileFormat.scala:103)
>> 	at org.apache.spark.sql.execution.datasources.FileFormat$class.buildReaderWithPartitionValues(FileFormat.scala:129)
>> 	at org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:165)
>> 	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:309)
>> 	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:305)
>> 	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:327)
>> 	at org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:121)
>> 	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
>> 	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>> 	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92)
>> 	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
>> 	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
>> 	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
>> 	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>> 	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
>> 	at org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:121)
>> 	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>> 	at org.apache.spark.sql.execution.window.WindowExec.doExecute(WindowExec.scala:302)
>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>> 	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
>> 	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
>> 	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>> 	at org.apache.spark.sql.execution.columnar.CachedRDDBuilder.buildBuffers(InMemoryRelation.scala:83)
>> 	at org.apache.spark.sql.execution.columnar.CachedRDDBuilder.cachedColumnBuffers(InMemoryRelation.scala:59)
>> 	at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.filteredCachedBatches(InMemoryTableScanExec.scala:276)
>> 	at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.inputRDD$lzycompute(InMemoryTableScanExec.scala:105)
>> 	at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.inputRDD(InMemoryTableScanExec.scala:104)
>> 	at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.doExecute(InMemoryTableScanExec.scala:310)
>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>> 	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
>> 	at org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:121)
>> 	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
>> 	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>> 	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:143)
>> 	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
>> 	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
>> 	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
>> 	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>> 	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
>> 	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
>> 	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
>> 	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
>> 	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
>> 	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
>> 	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
>> 	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
>> 	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
>> 	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
>> 	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
>> 	at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:664)
>>         [our code that writes data to CSV]	
>> Caused by: java.lang.ClassNotFoundException: com.mycompany.models.MyModel
>> 	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> 	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
>> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>> 	at java.lang.Class.forName0(Native Method)
>> 	at java.lang.Class.forName(Class.java:348)
>> 	at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132)
>> 	at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132)
>> 	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>> 	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>> 	at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:132)
>> 	... 132 more
>>
>>
> On Wed, Jul 10, 2019 at 12:50 PM Jerry Vinokurov <gr...@gmail.com>
> wrote:
>
>> Hi all,
>>
>> I am experiencing a strange intermittent failure of my Spark job that
>> results from serialization issues in Kryo. Here is the stack trace:
>>
>> Caused by: java.lang.ClassNotFoundException: com.mycompany.models.MyModel
>>> 	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>>> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>> 	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
>>> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>> 	at java.lang.Class.forName0(Native Method)
>>> 	at java.lang.Class.forName(Class.java:348)
>>> 	at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132)
>>> 	at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132)
>>> 	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>> 	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>>> 	at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:132)
>>> 	... 204 more
>>>
>>> (I've edited the company and model name since this is proprietary code)
>>
>> This error does not surface every time the job is run; I would say it
>> probably shows up once in every 10 runs or so, and there isn't anything
>> about the input data that triggers this, as I've been able to
>> (nondeterministically) reproduce the error by simply rerunning the job with
>> the same inputs over and over again. The model itself is just a plain Scala
>> case class whose fields are strings and integers, so there's no custom
>> serialization logic or anything like that. As I understand, this is seems
>> related to an issue previously documented here
>> <https://issues.apache.org/jira/browse/SPARK-21928>but allegedly this
>> was fixed long ago. I'm running this job on an AWS EMR cluster and have
>> confirmed that the version of Spark running there is 2.4.0, with the patch
>> that is linked in the above issue being part of the code.
>>
>> A suggested solution has been to set the extraClasspath config settings
>> on the driver and executor, but that has not fixed the problem. I'm out of
>> ideas for how to tackle this and would love to hear if anyone has any
>> suggestions or strategies for fixing this.
>>
>> thanks,
>> Jerry
>>
>> --
>> http://www.google.com/profiles/grapesmoker
>>
>
>
> --
> http://www.google.com/profiles/grapesmoker
>


-- 
Sent from my iPhone

Re: intermittent Kryo serialization failures in Spark

Posted by Jerry Vinokurov <gr...@gmail.com>.
Hi folks,

Posted this some time ago but the problem continues to bedevil us. I'm
including a (slightly edited) stack trace that results from this error. If
anyone can shed any light on what exactly is happening here and what we can
do to avoid it, that would be much appreciated.

org.apache.spark.SparkException: Failed to register classes with Kryo
> 	at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:140)
> 	at org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:324)
> 	at org.apache.spark.serializer.KryoSerializerInstance.<init>(KryoSerializer.scala:309)
> 	at org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:218)
> 	at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:288)
> 	at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:127)
> 	at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:88)
> 	at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
> 	at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
> 	at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1489)
> 	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.buildReader(CSVFileFormat.scala:103)
> 	at org.apache.spark.sql.execution.datasources.FileFormat$class.buildReaderWithPartitionValues(FileFormat.scala:129)
> 	at org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:165)
> 	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:309)
> 	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:305)
> 	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:327)
> 	at org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:121)
> 	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
> 	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
> 	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92)
> 	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
> 	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
> 	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
> 	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
> 	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
> 	at org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:121)
> 	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
> 	at org.apache.spark.sql.execution.window.WindowExec.doExecute(WindowExec.scala:302)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
> 	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
> 	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
> 	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
> 	at org.apache.spark.sql.execution.columnar.CachedRDDBuilder.buildBuffers(InMemoryRelation.scala:83)
> 	at org.apache.spark.sql.execution.columnar.CachedRDDBuilder.cachedColumnBuffers(InMemoryRelation.scala:59)
> 	at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.filteredCachedBatches(InMemoryTableScanExec.scala:276)
> 	at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.inputRDD$lzycompute(InMemoryTableScanExec.scala:105)
> 	at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.inputRDD(InMemoryTableScanExec.scala:104)
> 	at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.doExecute(InMemoryTableScanExec.scala:310)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
> 	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
> 	at org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:121)
> 	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
> 	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
> 	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:143)
> 	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
> 	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
> 	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
> 	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
> 	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
> 	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
> 	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
> 	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
> 	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
> 	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
> 	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
> 	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
> 	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
> 	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
> 	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
> 	at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:664)
>         [our code that writes data to CSV]	
> Caused by: java.lang.ClassNotFoundException: com.mycompany.models.MyModel
> 	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> 	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> 	at java.lang.Class.forName0(Native Method)
> 	at java.lang.Class.forName(Class.java:348)
> 	at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132)
> 	at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132)
> 	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> 	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
> 	at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:132)
> 	... 132 more
>
>
On Wed, Jul 10, 2019 at 12:50 PM Jerry Vinokurov <gr...@gmail.com>
wrote:

> Hi all,
>
> I am experiencing a strange intermittent failure of my Spark job that
> results from serialization issues in Kryo. Here is the stack trace:
>
> Caused by: java.lang.ClassNotFoundException: com.mycompany.models.MyModel
>> 	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> 	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
>> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>> 	at java.lang.Class.forName0(Native Method)
>> 	at java.lang.Class.forName(Class.java:348)
>> 	at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132)
>> 	at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132)
>> 	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>> 	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>> 	at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:132)
>> 	... 204 more
>>
>> (I've edited the company and model name since this is proprietary code)
>
> This error does not surface every time the job is run; I would say it
> probably shows up once in every 10 runs or so, and there isn't anything
> about the input data that triggers this, as I've been able to
> (nondeterministically) reproduce the error by simply rerunning the job with
> the same inputs over and over again. The model itself is just a plain Scala
> case class whose fields are strings and integers, so there's no custom
> serialization logic or anything like that. As I understand, this is seems
> related to an issue previously documented here
> <https://issues.apache.org/jira/browse/SPARK-21928>but allegedly this was
> fixed long ago. I'm running this job on an AWS EMR cluster and have
> confirmed that the version of Spark running there is 2.4.0, with the patch
> that is linked in the above issue being part of the code.
>
> A suggested solution has been to set the extraClasspath config settings on
> the driver and executor, but that has not fixed the problem. I'm out of
> ideas for how to tackle this and would love to hear if anyone has any
> suggestions or strategies for fixing this.
>
> thanks,
> Jerry
>
> --
> http://www.google.com/profiles/grapesmoker
>


-- 
http://www.google.com/profiles/grapesmoker