You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Xianjin YE (JIRA)" <ji...@apache.org> on 2018/01/11 09:30:01 UTC
[jira] [Created] (SPARK-23040) BlockStoreShuffleReader's return
Iterator isn't interruptible if aggregator or ordering is specified
Xianjin YE created SPARK-23040:
----------------------------------
Summary: BlockStoreShuffleReader's return Iterator isn't interruptible if aggregator or ordering is specified
Key: SPARK-23040
URL: https://issues.apache.org/jira/browse/SPARK-23040
Project: Spark
Issue Type: Improvement
Components: Spark Core
Affects Versions: 2.2.1, 2.2.0, 2.1.2, 2.1.1, 2.1.0, 2.0.2, 2.0.1, 2.0.0
Reporter: Xianjin YE
Priority: Minor
For example, if ordering is specified, the returned iterator is an CompletionIterator
{code:java}
dep.keyOrdering match {
case Some(keyOrd: Ordering[K]) =>
// Create an ExternalSorter to sort the data.
val sorter =
new ExternalSorter[K, C, C](context, ordering = Some(keyOrd), serializer = dep.serializer)
sorter.insertAll(aggregatedIter)
context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)
context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)
context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes)
CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](sorter.iterator, sorter.stop())
case None =>
aggregatedIter
}
{code}
However the sorter would consume(in sorter.insertAll) the aggregatedIter(which may be interruptible), then creates an iterator which isn't interruptible.
The problem with this is that Spark task cannot be cancelled due to stage fail(without interruptThread enabled, which is disabled by default), which wasting executor resource.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org