You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean Owen (JIRA)" <ji...@apache.org> on 2017/07/08 07:29:00 UTC

[jira] [Resolved] (SPARK-21347) Performance issues with KryoSerializer

     [ https://issues.apache.org/jira/browse/SPARK-21347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sean Owen resolved SPARK-21347.
-------------------------------
    Resolution: Not A Problem

I think this is more of question, and about a change or configuration in Kryo.

> Performance issues with KryoSerializer
> --------------------------------------
>
>                 Key: SPARK-21347
>                 URL: https://issues.apache.org/jira/browse/SPARK-21347
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.1.0
>            Reporter: Vishwanath Lakkundi
>
> We have a weird case of a performance problem in Spark(2.1.0) with Kryo(3.3)
> In our case the tasks are doing a shuffle. All of them finish, but one of the tasks keeps chugging along for hours. 
> In the end it processes a comparable number of rows and also produces a comparable number of rows as its counterparts taking mere seconds. So why is this task taking so much longer? Here is the stack we always see t for the slow task:
> "Executor task launch worker for task 871263" #319 daemon prio=5 os_prio=0 tid=0x00007f4e22009800 nid=0x2043 runnable [0x00007f4e4947a000]
>    java.lang.Thread.State: RUNNABLE
>         at com.esotericsoftware.kryo.util.IdentityObjectIntMap.clear(IdentityObjectIntMap.java:382)
>         at com.esotericsoftware.kryo.util.MapReferenceResolver.reset(MapReferenceResolver.java:65)
>         at com.esotericsoftware.kryo.Kryo.reset(Kryo.java:865)
>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:797)
>         at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:244)
>         at org.apache.spark.serializer.DeserializationStream.readValue(Serializer.scala:159)
>         at org.apache.spark.util.collection.ExternalAppendOnlyMap$DiskMapIterator.readNextItem(ExternalAppendOnlyMap.scala:507)
>         at org.apache.spark.util.collection.ExternalAppendOnlyMap$DiskMapIterator.hasNext(ExternalAppendOnlyMap.scala:527)
>         at org.apache.spark.util.collection.ExternalAppendOnlyMap$SpillableIterator.readNext(ExternalAppendOnlyMap.scala:590)
>         - locked <0x00000003cf400118> (a java.lang.Object)
>         at org.apache.spark.util.collection.ExternalAppendOnlyMap$SpillableIterator.next(ExternalAppendOnlyMap.scala:601)
>         at org.apache.spark.util.collection.ExternalAppendOnlyMap$SpillableIterator.next(ExternalAppendOnlyMap.scala:562)
>         at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
>         at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
>         at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>         at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
>         at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>         at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:191)
>         at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
>         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
>         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
>         at org.apache.spark.scheduler.Task.run(Task.scala:99)
>         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> It seems that it always reports this line in IdentityObjectIntMap.clear():
> public void clear () {
>    K[] keyTable = this.keyTable;
>    for (int i = capacity + stashSize; i-- > 0;)
>       keyTable[i] = null;   // <<<<<<<<<<<<< this line
>    size = 0;
>    stashSize = 0;
> }
> Since autoReset is enabled (by default in Kryo) the reset() is called at the end of each serialization/deserialization attempt, for example at Kryo.readClassAndObject(). Thats what seems to be triggering the reset per the above stack trace.
> is there way to avoid this expensive reset ? Could spark use KryoPool in KryoSerializer.borrowKryo() and operate on autoReset disabled in Kryo perhaps  ? (Not sure how this might impact supporting serialized shuffles.)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org