You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/11/22 16:49:10 UTC

[GitHub] [spark] steveloughran commented on a diff in pull request #36070: [SPARK-31675][CORE] Fix rename and delete files with different filesystem

steveloughran commented on code in PR #36070:
URL: https://github.com/apache/spark/pull/36070#discussion_r1029595674


##########
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala:
##########
@@ -194,19 +196,33 @@ class HadoopMapReduceCommitProtocol(
     if (hasValidPath) {
       val (allAbsPathFiles, allPartitionPaths) =
         taskCommits.map(_.obj.asInstanceOf[(Map[String, String], Set[String])]).unzip
-      val fs = stagingDir.getFileSystem(jobContext.getConfiguration)
+      val hadoopConf = jobContext.getConfiguration
+      val fs = stagingDir.getFileSystem(hadoopConf)
 
       val filesToMove = allAbsPathFiles.foldLeft(Map[String, String]())(_ ++ _)
       logDebug(s"Committing files staged for absolute locations $filesToMove")
       val absParentPaths = filesToMove.values.map(new Path(_).getParent).toSet
       if (dynamicPartitionOverwrite) {
         logDebug(s"Clean up absolute partition directories for overwriting: $absParentPaths")
-        absParentPaths.foreach(fs.delete(_, true))
+        absParentPaths.foreach(path => path.getFileSystem(hadoopConf)
+          .delete(path, true))
       }
       logDebug(s"Create absolute parent directories: $absParentPaths")
-      absParentPaths.foreach(fs.mkdirs)
+      absParentPaths.foreach(path => path.getFileSystem(hadoopConf).mkdirs(path))
       for ((src, dst) <- filesToMove) {
-        if (!fs.rename(new Path(src), new Path(dst))) {
+        val srcPath = new Path(src)
+        val dstPath = new Path(dst)
+        val srcFs = srcPath.getFileSystem(hadoopConf)
+        val dstFs = dstPath.getFileSystem(hadoopConf)
+        // Copying files across different file systems
+        if (needCopy(srcPath, dstPath, srcFs, dstFs)) {
+          if (!FileUtil.copy(srcFs, srcFs.listStatus(srcPath).map(_.getPath), dstFs, dstPath,

Review Comment:
   can i highlight something i've noticed here, that copy() command stos on src read() returning -1, without doing any checks to validate file length, not great.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org