You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/05/21 04:01:08 UTC

[jira] [Updated] (SPARK-19576) Task attempt paths exist in output path after saveAsNewAPIHadoopFile completes with speculation enabled

     [ https://issues.apache.org/jira/browse/SPARK-19576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon updated SPARK-19576:
---------------------------------
    Labels: bulk-closed  (was: )

> Task attempt paths exist in output path after saveAsNewAPIHadoopFile completes with speculation enabled
> -------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-19576
>                 URL: https://issues.apache.org/jira/browse/SPARK-19576
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.1.0
>            Reporter: sharkd tu
>            Priority: Major
>              Labels: bulk-closed
>
> {{writeShard}} in {{saveAsNewAPIHadoopDataset}} always committed its tasks without question. The problem is that when speculation is enabled sometimes this can result in multiple tasks committing their output to the same path, which may lead to task temporary paths exist in output path after {{saveAsNewAPIHadoopFile}} completes.
> {code}
> -rw-r--r--    3   user group       0   2017-02-11 19:36 hdfs://.../output/_SUCCESS
> drwxr-xr-x    -   user group       0   2017-02-11 19:36 hdfs://.../output/attempt_201702111936_32487_r_000044_0
> -rw-r--r--    3   user group    8952   2017-02-11 19:36 hdfs://.../output/part-r-00000
> -rw-r--r--    3   user group    7878   2017-02-11 19:36 hdfs://.../output/part-r-00001
> ...
> {code}
> Assume there are two attempt tasks that commit at the same time, The two attempt tasks maybe rename their task attempt paths to task committed path at the same time. When one task's {{rename}} operation completes, the other task's {{rename}} operation will let its task attempt path under the task committed path.
> {{commitTask}} in {{FileOutputCommitter}}: 
> {code}
> public void commitTask(TaskAttemptContext context, Path taskAttemptPath) 
>   throws IOException {
>     TaskAttemptID attemptId = context.getTaskAttemptID();
>     if (hasOutputPath()) {
>       context.progress();
>       if(taskAttemptPath == null) {
>         taskAttemptPath = getTaskAttemptPath(context);
>       }
>       Path committedTaskPath = getCommittedTaskPath(context);
>       FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
>       if (fs.exists(taskAttemptPath)) {
>         if(fs.exists(committedTaskPath)) {
>           if(!fs.delete(committedTaskPath, true)) {
>             throw new IOException("Could not delete " + committedTaskPath);
>           }
>         }
>         if(!fs.rename(taskAttemptPath, committedTaskPath)) {
>           throw new IOException("Could not rename " + taskAttemptPath + " to "
>               + committedTaskPath);
>         }
>         LOG.info("Saved output of task '" + attemptId + "' to " + 
>             committedTaskPath);
>       } else {
>         LOG.warn("No Output found for " + attemptId);
>       }
>     } else {
>       LOG.warn("Output Path is null in commitTask()");
>     }
>   }
> {code}
> Anyway, it is not recommended that {{writeShard}} in {{saveAsNewAPIHadoopDataset}} always committed its tasks without question. Similar question in SPARK-4879 triggered by calling {{saveAsHadoopFile}}  has been solved. So, we should solve another case that triggered by calling {{saveAsNewAPIHadoopFile}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org