You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/05/03 08:25:00 UTC
[jira] [Updated] (SPARK-27614) Executor shuffle fetch hang
[ https://issues.apache.org/jira/browse/SPARK-27614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hyukjin Kwon updated SPARK-27614:
---------------------------------
Description:
Most of the Tasks have been completed, and individual Tasks have a particularly long Duration and are not being processed at all
The corresponding Executor has a connection timeout, and the stack information shows hang in the method of ShuffleBlockFetcherIterator.next.
The corresponding code is as follows:
{code}
while (!isZombie && result == null) {
val startFetchWait = System.nanoTime()
result = results.take()
val fetchWaitTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startFetchWait)
shuffleMetrics.incFetchWaitTime(fetchWaitTime)
{code}
LinkedBlockingQueue's take method is blocked. We can use poll instead. The modified code is as follows:
{code}
currentResult = if(!this.blockManager.conf.getBoolean("spark.shuffle.fetch.timeout.enable", true)) results.take()
else {
logInfo("set spark.shuffle.fetch.timeout.enable=true.")
val GB = 1L << 30
val MB = 1L << 20
val (waitTime, unit) = if(bytesInFlight >= 2 * GB) (2, TimeUnit.HOURS)
else if(bytesInFlight >= GB) (1, TimeUnit.HOURS)
else if(bytesInFlight >= 512*MB) (45, TimeUnit.MINUTES)
else if(bytesInFlight >= 200*MB) (30, TimeUnit.MINUTES)
else if(bytesInFlight >= 100*MB) (20, TimeUnit.MINUTES)
else if(bytesInFlight >= 10*MB) (15, TimeUnit.MINUTES)
else (10, TimeUnit.MINUTES)
val r = results.poll(waitTime, unit)
if(r == null) {
val cost = "cost " + waitTime + unit.toString + " to wait for a shuffle block, give up!"
logError(cost)
throw new SparkException(cost)
{code}
was:
Most of the Tasks have been completed, and individual Tasks have a particularly long Duration and are not being processed at all
The corresponding Executor has a connection timeout, and the stack information shows hang in the method of ShuffleBlockFetcherIterator.next.
The corresponding code is as follows:
while (!isZombie && result == null) {
val startFetchWait = System.nanoTime()
result = results.take()
val fetchWaitTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startFetchWait)
shuffleMetrics.incFetchWaitTime(fetchWaitTime)
LinkedBlockingQueue's take method is blocked. We can use poll instead. The modified code is as follows:
currentResult = if(!this.blockManager.conf.getBoolean("spark.shuffle.fetch.timeout.enable", true)) results.take()
else {
logInfo("set spark.shuffle.fetch.timeout.enable=true.")
val GB = 1L << 30
val MB = 1L << 20
val (waitTime, unit) = if(bytesInFlight >= 2 * GB) (2, TimeUnit.HOURS)
else if(bytesInFlight >= GB) (1, TimeUnit.HOURS)
else if(bytesInFlight >= 512*MB) (45, TimeUnit.MINUTES)
else if(bytesInFlight >= 200*MB) (30, TimeUnit.MINUTES)
else if(bytesInFlight >= 100*MB) (20, TimeUnit.MINUTES)
else if(bytesInFlight >= 10*MB) (15, TimeUnit.MINUTES)
else (10, TimeUnit.MINUTES)
val r = results.poll(waitTime, unit)
if(r == null) {
val cost = "cost " + waitTime + unit.toString + " to wait for a shuffle block, give up!"
logError(cost)
throw new SparkException(cost)
> Executor shuffle fetch hang
> ---------------------------
>
> Key: SPARK-27614
> URL: https://issues.apache.org/jira/browse/SPARK-27614
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 2.1.1, 2.4.0
> Reporter: weDataSphere
> Priority: Major
>
> Most of the Tasks have been completed, and individual Tasks have a particularly long Duration and are not being processed at all
>
> The corresponding Executor has a connection timeout, and the stack information shows hang in the method of ShuffleBlockFetcherIterator.next.
>
> The corresponding code is as follows:
> {code}
> while (!isZombie && result == null) {
> val startFetchWait = System.nanoTime()
> result = results.take()
> val fetchWaitTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startFetchWait)
> shuffleMetrics.incFetchWaitTime(fetchWaitTime)
> {code}
>
>
> LinkedBlockingQueue's take method is blocked. We can use poll instead. The modified code is as follows:
> {code}
> currentResult = if(!this.blockManager.conf.getBoolean("spark.shuffle.fetch.timeout.enable", true)) results.take()
> else {
> logInfo("set spark.shuffle.fetch.timeout.enable=true.")
> val GB = 1L << 30
> val MB = 1L << 20
> val (waitTime, unit) = if(bytesInFlight >= 2 * GB) (2, TimeUnit.HOURS)
> else if(bytesInFlight >= GB) (1, TimeUnit.HOURS)
> else if(bytesInFlight >= 512*MB) (45, TimeUnit.MINUTES)
> else if(bytesInFlight >= 200*MB) (30, TimeUnit.MINUTES)
> else if(bytesInFlight >= 100*MB) (20, TimeUnit.MINUTES)
> else if(bytesInFlight >= 10*MB) (15, TimeUnit.MINUTES)
> else (10, TimeUnit.MINUTES)
> val r = results.poll(waitTime, unit)
> if(r == null) {
> val cost = "cost " + waitTime + unit.toString + " to wait for a shuffle block, give up!"
> logError(cost)
> throw new SparkException(cost)
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org