You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (JIRA)" <ji...@apache.org> on 2015/01/19 12:27:34 UTC

[jira] [Commented] (SPARK-5316) DAGScheduler may make shuffleToMapStage leak if getParentStages failes

    [ https://issues.apache.org/jira/browse/SPARK-5316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14282416#comment-14282416 ] 

Apache Spark commented on SPARK-5316:
-------------------------------------

User 'YanTangZhai' has created a pull request for this issue:
https://github.com/apache/spark/pull/4105

> DAGScheduler may make shuffleToMapStage leak if getParentStages failes
> ----------------------------------------------------------------------
>
>                 Key: SPARK-5316
>                 URL: https://issues.apache.org/jira/browse/SPARK-5316
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>            Reporter: YanTang Zhai
>            Priority: Minor
>
> DAGScheduler may make shuffleToMapStage leak if getParentStages failes.
> If getParentStages has exception for example input path does not exist, DAGScheduler would fail to handle job submission, while shuffleToMapStage may be put some records when getParentStages. However these records in shuffleToMapStage aren't going to be cleaned.
> A simple job as follows:
> {code:java}
> val inputFile1 = ... // Input path does not exist when this job submits
> val inputFile2 = ...
> val outputFile = ...
> val conf = new SparkConf()
> val sc = new SparkContext(conf)
> val rdd1 = sc.textFile(inputFile1)
>                     .flatMap(line => line.split(" "))
>                     .map(word => (word, 1))
>                     .reduceByKey(_ + _, 1)
> val rdd2 = sc.textFile(inputFile2)
>                     .flatMap(line => line.split(","))
>                     .map(word => (word, 1))
>                     .reduceByKey(_ + _, 1)
> try {
>   val rdd3 = new PairRDDFunctions(rdd1).join(rdd2, 1)
>   rdd3.saveAsTextFile(outputFile)
> } catch {
>   case e : Exception =>
>       logError(e)
> }
> // print the information of DAGScheduler's shuffleToMapStage to check
> // whether it still has uncleaned records.
> ...
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org