You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Cheng Pan (Jira)" <ji...@apache.org> on 2021/12/31 04:02:00 UTC

[jira] [Created] (SPARK-37793) Invalid LocalMergedBlockData cause task hang

Cheng Pan created SPARK-37793:
---------------------------------

             Summary: Invalid LocalMergedBlockData cause task hang
                 Key: SPARK-37793
                 URL: https://issues.apache.org/jira/browse/SPARK-37793
             Project: Spark
          Issue Type: Bug
          Components: Shuffle
    Affects Versions: 3.3.0
            Reporter: Cheng Pan


When enable push-based shuffle, there is a chance that task hang

 
{code:java}
59	Executor task launch worker for task 424.0 in stage 753.0 (TID 106778)	WAITING	Lock(java.util.concurrent.ThreadPoolExecutor$Worker@1660371198})
sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2044)
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:753)
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:85)
org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.sort_addToSorter_0$(Unknown Source)
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.processNext(Unknown Source)
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.smj_findNextJoinRows_0$(Unknown Source)
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.hashAgg_doAggregateWithKeys_1$(Unknown Source)
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.hashAgg_doAggregateWithKeys_0$(Unknown Source)
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.hashAgg_doAggregateWithoutKey_0$(Unknown Source)
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.processNext(Unknown Source)
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:779)
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
org.apache.spark.scheduler.Task.run(Task.scala:136)
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:507)
org.apache.spark.executor.Executor$TaskRunner$$Lambda$518/852390142.apply(Unknown Source)
org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1470)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:510)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)
{code}

ShuffleBlockFetcherIterator.scala:753
{code:java}
    while (result == null) {
      val startFetchWait = System.nanoTime()
753>  result = results.take()
      val fetchWaitTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startFetchWait)
      shuffleMetrics.incFetchWaitTime(fetchWaitTime)
      ..
    }
{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org