You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Erik Krogen (Jira)" <ji...@apache.org> on 2021/04/16 16:05:00 UTC

[jira] [Created] (SPARK-35106) HadoopMapReduceCommitProtocol performs bad rename when dynamic partition overwrite is used

Erik Krogen created SPARK-35106:
-----------------------------------

             Summary: HadoopMapReduceCommitProtocol performs bad rename when dynamic partition overwrite is used
                 Key: SPARK-35106
                 URL: https://issues.apache.org/jira/browse/SPARK-35106
             Project: Spark
          Issue Type: Bug
          Components: Input/Output, Spark Core
    Affects Versions: 3.1.1
            Reporter: Erik Krogen


Recently when evaluating the code in {{HadoopMapReduceCommitProtocol#commitJob}}, I found some bad codepath under the {{dynamicPartitionOverwrite == true}} scenario:

{code:language=scala}
      # BLOCK 1
      if (dynamicPartitionOverwrite) {
        val absPartitionPaths = filesToMove.values.map(new Path(_).getParent).toSet
        logDebug(s"Clean up absolute partition directories for overwriting: $absPartitionPaths")
        absPartitionPaths.foreach(fs.delete(_, true))
      }
      # BLOCK 2
      for ((src, dst) <- filesToMove) {
        fs.rename(new Path(src), new Path(dst))
      }

      # BLOCK 3
      if (dynamicPartitionOverwrite) {
        val partitionPaths = allPartitionPaths.foldLeft(Set[String]())(_ ++ _)
        logDebug(s"Clean up default partition directories for overwriting: $partitionPaths")
        for (part <- partitionPaths) {
          val finalPartPath = new Path(path, part)
          if (!fs.delete(finalPartPath, true) && !fs.exists(finalPartPath.getParent)) {
            // According to the official hadoop FileSystem API spec, delete op should assume
            // the destination is no longer present regardless of return value, thus we do not
            // need to double check if finalPartPath exists before rename.
            // Also in our case, based on the spec, delete returns false only when finalPartPath
            // does not exist. When this happens, we need to take action if parent of finalPartPath
            // also does not exist(e.g. the scenario described on SPARK-23815), because
            // FileSystem API spec on rename op says the rename dest(finalPartPath) must have
            // a parent that exists, otherwise we may get unexpected result on the rename.
            fs.mkdirs(finalPartPath.getParent)
          }
          fs.rename(new Path(stagingDir, part), finalPartPath)
        }
      }
{code}

Assuming {{dynamicPartitionOverwrite == true}}, we have the following sequence of events:
# Block 1 deletes all parent directories of {{filesToMove.values}}
# Block 2 attempts to rename all {{filesToMove.keys}} to {{filesToMove.values}}
# Block 3 does directory-level renames to place files into their final locations

All renames in Block 2 will always fail, since all parent directories of {{filesToMove.values}} were just deleted in Block 1. Under a normal HDFS scenario, the contract of {{fs.rename}} is to return {{false}} under such a failure scenario, as opposed to throwing an exception. There is a separate issue here that Block 2 should probably be checking for those {{false}} return values -- but this allows for {{dynamicPartitionOverwrite}} to "work", albeit with a bunch of failed renames in the middle. Really, we should only run Block 2 in the {{dynamicPartitionOverwrite == false}} case, and consolidate Blocks 1 and 3 to run in the {{true}} case.

We discovered this issue when testing against a {{FileSystem}} implementation which was throwing an exception for this failed rename scenario instead of returning false, escalating the silent/ignored rename failures into actual failures.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org