You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/05/21 04:04:35 UTC

[jira] [Updated] (SPARK-19685) PipedRDD tasks should not hang on interruption / errors

     [ https://issues.apache.org/jira/browse/SPARK-19685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon updated SPARK-19685:
---------------------------------
    Labels: bulk-closed  (was: )

> PipedRDD tasks should not hang on interruption / errors
> -------------------------------------------------------
>
>                 Key: SPARK-19685
>                 URL: https://issues.apache.org/jira/browse/SPARK-19685
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.6.0, 2.0.0, 2.1.0
>            Reporter: Josh Rosen
>            Priority: Major
>              Labels: bulk-closed
>
> While looking at WARN and ERROR-level logs from Spark executors, I spotted a problem where PipedRDD tasks may continue running after being cancelled or after failing. Specifically, I saw many cancelled tasks hanging in the following stacks:
> {code}
> java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
> java.io.FilterOutputStream.close(FilterOutputStream.java:158)
> java.lang.UNIXProcess.destroy(UNIXProcess.java:445)
> java.lang.UNIXProcess.destroy(UNIXProcess.java:478)
> org.apache.spark.rdd.PipedRDD$$anon$1.propagateChildException(PipedRDD.scala:203)
> org.apache.spark.rdd.PipedRDD$$anon$1.hasNext(PipedRDD.scala:183)
> scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
> scala.collection.Iterator$class.foreach(Iterator.scala:893)
> scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
> scala.collection.TraversableOnce$class.fold(TraversableOnce.scala:212)
> scala.collection.AbstractIterator.fold(Iterator.scala:1336)
> org.apache.spark.rdd.RDD$$anonfun$fold$1$$anonfun$20.apply(RDD.scala:1086)
> org.apache.spark.rdd.RDD$$anonfun$fold$1$$anonfun$20.apply(RDD.scala:1086)
> org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1980)
> org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1980)
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> org.apache.spark.scheduler.Task.run(Task.scala:99)
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> java.lang.Thread.run(Thread.java:745)
> {code}
> and 
> {code}
> java.io.FileInputStream.readBytes(Native Method)
> java.io.FileInputStream.read(FileInputStream.java:255)
> java.io.BufferedInputStream.read1(BufferedInputStream.java:284)
> java.io.BufferedInputStream.read(BufferedInputStream.java:345)
> sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
> sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
> sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
> java.io.InputStreamReader.read(InputStreamReader.java:184)
> java.io.BufferedReader.fill(BufferedReader.java:161)
> java.io.BufferedReader.readLine(BufferedReader.java:324)
> java.io.BufferedReader.readLine(BufferedReader.java:389)
> scala.io.BufferedSource$BufferedLineIterator.hasNext(BufferedSource.scala:72)
> org.apache.spark.rdd.PipedRDD$$anon$1.hasNext(PipedRDD.scala:172)
> scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
> scala.collection.Iterator$class.foreach(Iterator.scala:893)
> scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
> scala.collection.TraversableOnce$class.fold(TraversableOnce.scala:212)
> scala.collection.AbstractIterator.fold(Iterator.scala:1336)
> org.apache.spark.rdd.RDD$$anonfun$fold$1$$anonfun$20.apply(RDD.scala:1086)
> org.apache.spark.rdd.RDD$$anonfun$fold$1$$anonfun$20.apply(RDD.scala:1086)
> org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1980)
> org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1980)
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> org.apache.spark.scheduler.Task.run(Task.scala:99)
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> java.lang.Thread.run(Thread.java:745)
> {code}
> I do not have a minimal reproduction of this issue yet, but I suspect that we can make one by having PipedRDD call a process which hangs indefinitely without printing any output, then cancel the Spark job with {{interruptOnCancel=true}}. If my hunch is right, we should witness the PipedRDD tasks continuing to run either because the call to destroy the child process is hanging or because we don't check whether the task has been interrupted. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org