You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Wenchen Fan (JIRA)" <ji...@apache.org> on 2017/07/27 15:03:00 UTC
[jira] [Resolved] (SPARK-21319)
UnsafeExternalRowSorter.RowComparator memory leak
[ https://issues.apache.org/jira/browse/SPARK-21319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Wenchen Fan resolved SPARK-21319.
---------------------------------
Resolution: Fixed
Fix Version/s: 2.3.0
Issue resolved by pull request 18679
[https://github.com/apache/spark/pull/18679]
> UnsafeExternalRowSorter.RowComparator memory leak
> -------------------------------------------------
>
> Key: SPARK-21319
> URL: https://issues.apache.org/jira/browse/SPARK-21319
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 2.0.0, 2.1.0, 2.2.0, 2.3.0
> Reporter: James Baker
> Fix For: 2.3.0
>
> Attachments: 0001-SPARK-21319-Fix-memory-leak-in-UnsafeExternalRowSort.patch, hprof.png
>
>
> When we wish to sort within partitions, we produce an UnsafeExternalRowSorter. This contains an UnsafeExternalSorter, which contains the UnsafeExternalRowComparator.
> The UnsafeExternalSorter adds a task completion listener which performs any additional required cleanup. The upshot of this is that we maintain a reference to the UnsafeExternalRowSorter.RowComparator until the end of the task.
> The RowComparator looks like
> {code:java}
> private static final class RowComparator extends RecordComparator {
> private final Ordering<InternalRow> ordering;
> private final int numFields;
> private final UnsafeRow row1;
> private final UnsafeRow row2;
> RowComparator(Ordering<InternalRow> ordering, int numFields) {
> this.numFields = numFields;
> this.row1 = new UnsafeRow(numFields);
> this.row2 = new UnsafeRow(numFields);
> this.ordering = ordering;
> }
> @Override
> public int compare(Object baseObj1, long baseOff1, Object baseObj2, long baseOff2) {
> // TODO: Why are the sizes -1?
> row1.pointTo(baseObj1, baseOff1, -1);
> row2.pointTo(baseObj2, baseOff2, -1);
> return ordering.compare(row1, row2);
> }
> }
> {code}
> which means that this will contain references to the last baseObjs that were passed in, and without tracking them for purposes of memory allocation.
> We have a job which sorts within partitions and then coalesces partitions - this has a tendency to OOM because of the references to old UnsafeRows that were used during the sorting.
> Attached is a screenshot of a memory dump during a task - our JVM has two executor threads.
> It can be seen that we have 2 references inside of row iterators, and 11 more which are only known in the task completion listener or as part of memory management.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org