You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "jeanlyn (JIRA)" <ji...@apache.org> on 2015/08/17 09:29:45 UTC

[jira] [Commented] (SPARK-8513) _temporary may be left undeleted when a write job committed with FileOutputCommitter fails due to a race condition

    [ https://issues.apache.org/jira/browse/SPARK-8513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14699117#comment-14699117 ] 

jeanlyn commented on SPARK-8513:
--------------------------------

I think i encountered the problem these day. Our job failed due to {{_temporary}} left, and when using the hive api update partitions will throw exception if it has nested directory.
{noformat}
2015-08-12 07:07:07 INFO org.apache.hadoop.hive.ql.metadata.HiveException: checkPaths: hdfs://ns1/tmp/hive-dd_edw/hive_2015-08-12_07-02-20_902_7762418154833191311-1/-ext-10000 has nested directoryhdfs://ns1/tmp/hive-dd_edw/hive_2015-08-12_07-02-20_902_7762418154833191311-1/-ext-10000/_temporary
at org.apache.hadoop.hive.ql.metadata.Hive.checkPaths(Hive.java:2080)
at org.apache.hadoop.hive.ql.metadata.Hive.replaceFiles(Hive.java:2270)
at org.apache.hadoop.hive.ql.metadata.Hive.loadPartition(Hive.java:1222)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult$lzycompute(InsertIntoHiveTable.scala:233)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult(InsertIntoHiveTable.scala:124)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.execute(InsertIntoHiveTable.scala:266)
at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:1140)
at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:1140)
at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:147)
at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:130)
at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51)
at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:97)
at org.apache.spark.sql.hive.thriftserver.AbstractSparkSQLDriver.run(AbstractSparkSQLDriver.scala:57)
at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:273)
at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:507)
at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:442)
at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:148)
at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:619)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
{noformat}

In our case, all of our task had finished except the speculative task.
{code}
15/08/12 07:07:06 INFO TaskSetManager: Marking task 6 in stage 40.0 (on BJHC-HERA-17163.hadoop.local) as speculatable because it ran more than 33639 ms

(speculative task)******15/08/12 07:07:06 INFO TaskSetManager: Starting task 6.1 in stage 40.0 (TID 165, BJHC-HERA-16580.hadoop.local, PROCESS_LOCAL, 1687 bytes)*********

15/08/12 07:07:06 INFO BlockManagerInfo: Added broadcast_60_piece0 in memory on BJHC-HERA-16580.hadoop.local:48182 (size: 740.2 KB, free: 2.1 GB)
15/08/12 07:07:06 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 1 to BJHC-HERA-16580.hadoop.local:9208
15/08/12 07:07:07 INFO TaskSetManager: Finished task 6.0 in stage 40.0 (TID 161) in 34449 ms on BJHC-HERA-17163.hadoop.local (10/10)
15/08/12 07:07:07 INFO DAGScheduler: ResultStage 40 (runJob at InsertIntoHiveTable.scala:83) finished in 34.457 s
{code}
However, i can not find any code to cancel the speculative task. So, if we want to fix this issue, do we also need to add the cancel logic(kill the speculative tasks) before making task cancellation synchronous when job finished?


> _temporary may be left undeleted when a write job committed with FileOutputCommitter fails due to a race condition
> ------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-8513
>                 URL: https://issues.apache.org/jira/browse/SPARK-8513
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core, SQL
>    Affects Versions: 1.2.2, 1.3.1, 1.4.0
>            Reporter: Cheng Lian
>
> To reproduce this issue, we need a node with relatively more cores, say 32 (e.g., Spark Jenkins builder is a good candidate).  With such a node, the following code should be relatively easy to reproduce this issue:
> {code}
> sqlContext.range(0, 10).repartition(32).select('id / 0).write.mode("overwrite").parquet("file:///tmp/foo")
> {code}
> You may observe similar log lines as below:
> {noformat}
> 01:58:27.682 pool-1-thread-1-ScalaTest-running-CommitFailureTestRelationSuite WARN FileUtil: Failed to delete file or dir [/home/jenkins/workspace/SparkPullRequestBuilder/target/tmp/spark-a918b285-fa59-4a29-857e-a95e38fa355a/_temporary/0/_temporary]: it still exists.
> {noformat}
> The reason is that, for a Spark job with multiple tasks, when a task fails after multiple retries, the job gets canceled on driver side.  At the same time, all child tasks of this job also get canceled.  However, task cancelation is asynchronous.  This means, some tasks may still be running when the job is already killed on driver side.
> With this in mind, the following execution order may cause the log line mentioned above:
> # Job {{A}} spawns 32 tasks to write the Parquet file
>   Since {{ParquetOutputCommitter}} is a subclass of {{FileOutputClass}}, a temporary directory {{D1}} is created to hold output files of different task attempts.
> # Task {{a1}} fails after several retries first because of the division by zero error
> # Task {{a1}} aborts the Parquet write task and tries to remove its task attempt output directory {{d1}} (a sub-directory of {{D1}})
> # Job {{A}} gets canceled on driver side, all the other 31 tasks also get canceled *asynchronously*
> # {{ParquetOutputCommitter.abortJob()}} tries to remove {{D1}} by first removing all its child files/directories first
>   Note that when testing with local directory, {{RawLocalFileSystem}} simply calls {{java.io.File.delete()}} to deletion, and only empty directories can be deleted.
> # Because tasks are canceled asynchronously, some other task, say {{a2}}, may just get scheduled and create its own task attempt directory {{d2}} under {{D1}}
> # Now {{ParquetOutputCommitter.abortJob()}} tries to finally remove {{D1}} itself, but fails because {{d2}} makes {{D1}} non-empty again
> Notice that this bug affects all Spark jobs that writes files with {{FileOutputCommitter}} and its subclasses which create and delete temporary directories.
> One of the possible way to fix this issue can be making task cancellation synchronous, but this also increases latency.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org