You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/02/14 08:31:25 UTC

[GitHub] [spark] LuciferYang opened a new pull request #35509: [SPARK-38201][K8S] Fix `KubernetesUtils#uploadFileToHadoopCompatibleFS` use passed in `delSrc` and `overwrite`

LuciferYang opened a new pull request #35509:
URL: https://github.com/apache/spark/pull/35509


   ### What changes were proposed in this pull request?
   `KubernetesUtils#uploadFileToHadoopCompatibleFS` defines the input parameters `delSrc` and `overwrite`,  but constants(`false` and `true`) are used when invoke `FileSystem.copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, Path dst) `, this pr change to use passed in `delSrc` and `overwrite` when invoke the`copyFromLocalFile` method.
   
   
   ### Why are the changes needed?
   Bug fix
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   Pass GA


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a change in pull request #35509: [SPARK-38201][K8S] Fix `uploadFileToHadoopCompatibleFS` to use `delSrc` and `overwrite` parameters

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on a change in pull request #35509:
URL: https://github.com/apache/spark/pull/35509#discussion_r807482492



##########
File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +72,57 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different delSrc and overwrite") {
+    withTempDir { srcDir =>
+      withTempDir { destDir =>
+        val upload = PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+        val fileName = "test.txt"
+        val srcFile = new File(srcDir, fileName)
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+
+        def checkUploadException(delSrc: Boolean, overwrite: Boolean): Unit = {
+          val message = intercept[SparkException] {
+            KubernetesUtils.invokePrivate(upload(src, dest, fs, delSrc, overwrite))
+          }.getMessage
+          assert(message.contains("Error uploading file"))
+        }
+
+        def appendFileAndUpload(content: String, delSrc: Boolean, overwrite: Boolean): Unit = {
+          FileUtils.write(srcFile, content, StandardCharsets.UTF_8, true)
+          KubernetesUtils.invokePrivate(upload(src, dest, fs, delSrc, overwrite))
+        }
+
+        // Write a new file, upload file with delSrc = false and overwrite = true.
+        // Upload successful and record the `fileLength`.
+        appendFileAndUpload("init-content", delSrc = false, overwrite = true)
+        val firstLength = fs.getFileStatus(dest).getLen
+
+        // Append the file, upload file with delSrc = false and overwrite = true.
+        // Upload succeeded but `fileLength` changed.
+        appendFileAndUpload("append-content", delSrc = false, overwrite = true)
+        val secondLength = fs.getFileStatus(dest).getLen
+        assert(firstLength < secondLength)
+
+        // Upload file with delSrc = false and overwrite = false.
+        // Upload failed because dest exists.
+        checkUploadException(delSrc = false, overwrite = false)

Review comment:
       [d68f119](https://github.com/apache/spark/pull/35509/commits/d68f1196a42359a1cfa0ac054cb3961434f93d75) fix this. Sorry I got off work a little early yesterday.
   
    




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a change in pull request #35509: [SPARK-38201][K8S] Fix `KubernetesUtils#uploadFileToHadoopCompatibleFS` use passed in `delSrc` and `overwrite`

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on a change in pull request #35509:
URL: https://github.com/apache/spark/pull/35509#discussion_r806554473



##########
File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+        // Scenario 1: delSrc = false and overwrite = true, upload successful
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val firstUploadTime = fs.getFileStatus(dest).getModificationTime
+        // sleep 1s to ensure that the `ModificationTime` changes.
+        TimeUnit.SECONDS.sleep(1)

Review comment:
       OK




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on a change in pull request #35509: [SPARK-38201][K8S] Fix `KubernetesUtils#uploadFileToHadoopCompatibleFS` use passed in `delSrc` and `overwrite`

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on a change in pull request #35509:
URL: https://github.com/apache/spark/pull/35509#discussion_r806564535



##########
File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+        // Scenario 1: delSrc = false and overwrite = true, upload successful
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val firstUploadTime = fs.getFileStatus(dest).getModificationTime
+        // sleep 1s to ensure that the `ModificationTime` changes.
+        TimeUnit.SECONDS.sleep(1)

Review comment:
       When it comes to the test time, Apache Spark community is very careful and has been trying to reduce the test time again and again. `Sleep` is one of the typical pattern we want to avoid.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on a change in pull request #35509: [SPARK-38201][K8S] Fix `KubernetesUtils#uploadFileToHadoopCompatibleFS` use passed in `delSrc` and `overwrite`

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on a change in pull request #35509:
URL: https://github.com/apache/spark/pull/35509#discussion_r806543020



##########
File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,43 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different delSrc and overwrite") {
+    val upload = PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())

Review comment:
       If possible, shall we use `StandardCharsets.UTF_8` instead of `Charset.defaultCharset()`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on a change in pull request #35509: [SPARK-38201][K8S] Fix `KubernetesUtils#uploadFileToHadoopCompatibleFS` use passed in `delSrc` and `overwrite`

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on a change in pull request #35509:
URL: https://github.com/apache/spark/pull/35509#discussion_r806541144



##########
File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+        // Scenario 1: delSrc = false and overwrite = true, upload successful
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val firstUploadTime = fs.getFileStatus(dest).getModificationTime
+        // sleep 1s to ensure that the `ModificationTime` changes.
+        TimeUnit.SECONDS.sleep(1)
+        // Scenario 2: delSrc = false and overwrite = true,
+        // upload succeeded but `ModificationTime` changed
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val secondUploadTime = fs.getFileStatus(dest).getModificationTime
+        assert(firstUploadTime != secondUploadTime)
+
+        // Scenario 3: delSrc = false and overwrite = false,
+        // upload failed because dest exists
+        val message = intercept[SparkException] {
+          KubernetesUtils
+            .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, false))
+        }.getMessage
+        assert(message.contains("Error uploading file"))
+
+        TimeUnit.SECONDS.sleep(1)
+        // Scenario 4: delSrc = true and overwrite = true,
+        // upload succeeded, `ModificationTime` changed and src not exists.
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, true, true))
+        val thirdUploadTime = fs.getFileStatus(dest).getModificationTime
+        assert(secondUploadTime != thirdUploadTime)
+        assert(!fs.exists(src))
+      }

Review comment:
       So, we don't test `delSrc=true` and `overwrite=false`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on a change in pull request #35509: [SPARK-38201][K8S] Fix `KubernetesUtils#uploadFileToHadoopCompatibleFS` use passed in `delSrc` and `overwrite`

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on a change in pull request #35509:
URL: https://github.com/apache/spark/pull/35509#discussion_r806550245



##########
File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+        // Scenario 1: delSrc = false and overwrite = true, upload successful
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val firstUploadTime = fs.getFileStatus(dest).getModificationTime
+        // sleep 1s to ensure that the `ModificationTime` changes.
+        TimeUnit.SECONDS.sleep(1)

Review comment:
       Since the unit of `getModificationTime` is a millisecond , can we try `TimeUnit.MILLISECONDS.sleep(100)`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on pull request #35509: [SPARK-38201][K8S] Fix `KubernetesUtils#uploadFileToHadoopCompatibleFS` use passed in `delSrc` and `overwrite`

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on pull request #35509:
URL: https://github.com/apache/spark/pull/35509#issuecomment-1039891455


   @dongjoon-hyun [bb04211](https://github.com/apache/spark/pull/35509/commits/bb04211f2f51bf45a81871c87f24badc90f4b26a) add a new test case


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on a change in pull request #35509: [SPARK-38201][K8S] Fix `KubernetesUtils#uploadFileToHadoopCompatibleFS` use passed in `delSrc` and `overwrite`

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on a change in pull request #35509:
URL: https://github.com/apache/spark/pull/35509#discussion_r806536581



##########
File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+        // Scenario 1: delSrc = false and overwrite = true, upload successful
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val firstUploadTime = fs.getFileStatus(dest).getModificationTime
+        // sleep 1s to ensure that the `ModificationTime` changes.
+        TimeUnit.SECONDS.sleep(1)
+        // Scenario 2: delSrc = false and overwrite = true,
+        // upload succeeded but `ModificationTime` changed
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val secondUploadTime = fs.getFileStatus(dest).getModificationTime
+        assert(firstUploadTime != secondUploadTime)
+
+        // Scenario 3: delSrc = false and overwrite = false,
+        // upload failed because dest exists
+        val message = intercept[SparkException] {
+          KubernetesUtils
+            .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, false))
+        }.getMessage
+        assert(message.contains("Error uploading file"))
+
+        TimeUnit.SECONDS.sleep(1)
+        // Scenario 4: delSrc = true and overwrite = true,
+        // upload succeeded, `ModificationTime` changed and src not exists.
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, true, true))

Review comment:
       ```scala
   -        KubernetesUtils
   -          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, true, true))
   +        KubernetesUtils.invokePrivate(upload(src, dest, fs, true, true))
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on a change in pull request #35509: [SPARK-38201][K8S] Fix `KubernetesUtils#uploadFileToHadoopCompatibleFS` use passed in `delSrc` and `overwrite`

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on a change in pull request #35509:
URL: https://github.com/apache/spark/pull/35509#discussion_r806535291



##########
File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))

Review comment:
       ```scala
   -    val uploadFileToHadoopCompatibleFSMethod =
   -      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
   +    val upload = PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on pull request #35509: [SPARK-38201][K8S] Fix `KubernetesUtils#uploadFileToHadoopCompatibleFS` use passed in `delSrc` and `overwrite`

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on pull request #35509:
URL: https://github.com/apache/spark/pull/35509#issuecomment-1040017309


   @dongjoon-hyun [0a9bce5](https://github.com/apache/spark/pull/35509/commits/0a9bce5bfaaacfc02ca11e45bef4faaf9a1c0962) update the case:
   
   1. Use the file length instead of mTime to check the file change by updating the file content
   2. Add test of `delSrc=true` and `overwrite=false`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on a change in pull request #35509: [SPARK-38201][K8S] Fix `KubernetesUtils#uploadFileToHadoopCompatibleFS` use passed in `delSrc` and `overwrite`

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on a change in pull request #35509:
URL: https://github.com/apache/spark/pull/35509#discussion_r806553995



##########
File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+        // Scenario 1: delSrc = false and overwrite = true, upload successful
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val firstUploadTime = fs.getFileStatus(dest).getModificationTime
+        // sleep 1s to ensure that the `ModificationTime` changes.
+        TimeUnit.SECONDS.sleep(1)

Review comment:
       Please use `FileUtils.readFileToString`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on a change in pull request #35509: [SPARK-38201][K8S] Fix `KubernetesUtils#uploadFileToHadoopCompatibleFS` use passed in `delSrc` and `overwrite`

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on a change in pull request #35509:
URL: https://github.com/apache/spark/pull/35509#discussion_r806535077



##########
File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -17,13 +17,21 @@
 
 package org.apache.spark.deploy.k8s
 
+import java.io.File
+import java.nio.charset.Charset
+import java.util.concurrent.TimeUnit
+
 import scala.collection.JavaConverters._
 
 import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder}
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.scalatest.PrivateMethodTester
 
 import org.apache.spark.SparkFunSuite

Review comment:
       ```scala
   -import org.apache.spark.SparkFunSuite
   +import org.apache.spark.{SparkException, SparkFunSuite}
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on a change in pull request #35509: [SPARK-38201][K8S] Fix `KubernetesUtils#uploadFileToHadoopCompatibleFS` use passed in `delSrc` and `overwrite`

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on a change in pull request #35509:
URL: https://github.com/apache/spark/pull/35509#discussion_r806535784



##########
File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+        // Scenario 1: delSrc = false and overwrite = true, upload successful
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))

Review comment:
       ```scala
   -        KubernetesUtils
   -          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
   +        KubernetesUtils.invokePrivate(upload(src, dest, fs, false, true))
   ```

##########
File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException

Review comment:
       ```scala
   -        import org.apache.spark.SparkException
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on a change in pull request #35509: [SPARK-38201][K8S] Fix `KubernetesUtils#uploadFileToHadoopCompatibleFS` use passed in `delSrc` and `overwrite`

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on a change in pull request #35509:
URL: https://github.com/apache/spark/pull/35509#discussion_r806540505



##########
File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+        // Scenario 1: delSrc = false and overwrite = true, upload successful
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val firstUploadTime = fs.getFileStatus(dest).getModificationTime
+        // sleep 1s to ensure that the `ModificationTime` changes.
+        TimeUnit.SECONDS.sleep(1)
+        // Scenario 2: delSrc = false and overwrite = true,
+        // upload succeeded but `ModificationTime` changed
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val secondUploadTime = fs.getFileStatus(dest).getModificationTime
+        assert(firstUploadTime != secondUploadTime)
+
+        // Scenario 3: delSrc = false and overwrite = false,
+        // upload failed because dest exists
+        val message = intercept[SparkException] {
+          KubernetesUtils
+            .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, false))
+        }.getMessage
+        assert(message.contains("Error uploading file"))
+
+        TimeUnit.SECONDS.sleep(1)
+        // Scenario 4: delSrc = true and overwrite = true,
+        // upload succeeded, `ModificationTime` changed and src not exists.
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, true, true))
+        val thirdUploadTime = fs.getFileStatus(dest).getModificationTime
+        assert(secondUploadTime != thirdUploadTime)

Review comment:
       ditto. `<` instead of `!=`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on a change in pull request #35509: [SPARK-38201][K8S] Fix `KubernetesUtils#uploadFileToHadoopCompatibleFS` use passed in `delSrc` and `overwrite`

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on a change in pull request #35509:
URL: https://github.com/apache/spark/pull/35509#discussion_r806550773



##########
File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+        // Scenario 1: delSrc = false and overwrite = true, upload successful
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val firstUploadTime = fs.getFileStatus(dest).getModificationTime
+        // sleep 1s to ensure that the `ModificationTime` changes.
+        TimeUnit.SECONDS.sleep(1)

Review comment:
       Oh, got it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on a change in pull request #35509: [SPARK-38201][K8S] Fix `KubernetesUtils#uploadFileToHadoopCompatibleFS` use passed in `delSrc` and `overwrite`

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on a change in pull request #35509:
URL: https://github.com/apache/spark/pull/35509#discussion_r806536402



##########
File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+        // Scenario 1: delSrc = false and overwrite = true, upload successful
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val firstUploadTime = fs.getFileStatus(dest).getModificationTime
+        // sleep 1s to ensure that the `ModificationTime` changes.
+        TimeUnit.SECONDS.sleep(1)
+        // Scenario 2: delSrc = false and overwrite = true,
+        // upload succeeded but `ModificationTime` changed
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val secondUploadTime = fs.getFileStatus(dest).getModificationTime
+        assert(firstUploadTime != secondUploadTime)
+
+        // Scenario 3: delSrc = false and overwrite = false,
+        // upload failed because dest exists
+        val message = intercept[SparkException] {
+          KubernetesUtils
+            .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, false))

Review comment:
       ```scala
   -          KubernetesUtils
   -            .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, false))
   +          KubernetesUtils.invokePrivate(upload(src, dest, fs, false, false))
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a change in pull request #35509: [SPARK-38201][K8S] Fix `KubernetesUtils#uploadFileToHadoopCompatibleFS` use passed in `delSrc` and `overwrite`

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on a change in pull request #35509:
URL: https://github.com/apache/spark/pull/35509#discussion_r805603065



##########
File path: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
##########
@@ -344,7 +344,7 @@ object KubernetesUtils extends Logging {
       delSrc : Boolean = false,
       overwrite: Boolean = true): Unit = {
     try {
-      fs.copyFromLocalFile(false, true, src, dest)
+      fs.copyFromLocalFile(delSrc, overwrite, src, dest)

Review comment:
       The signature of this method is:
   ```java
   public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, Path dst) throws IOException {
           Configuration conf = this.getConf();
           FileUtil.copy(getLocal(conf), src, this, dst, delSrc, overwrite, conf);
       }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a change in pull request #35509: [SPARK-38201][K8S] Fix `KubernetesUtils#uploadFileToHadoopCompatibleFS` use passed in `delSrc` and `overwrite`

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on a change in pull request #35509:
URL: https://github.com/apache/spark/pull/35509#discussion_r805603065



##########
File path: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
##########
@@ -344,7 +344,7 @@ object KubernetesUtils extends Logging {
       delSrc : Boolean = false,
       overwrite: Boolean = true): Unit = {
     try {
-      fs.copyFromLocalFile(false, true, src, dest)
+      fs.copyFromLocalFile(delSrc, overwrite, src, dest)

Review comment:
       The called method is:
   ```java
   public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, Path dst) throws IOException {
           Configuration conf = this.getConf();
           FileUtil.copy(getLocal(conf), src, this, dst, delSrc, overwrite, conf);
       }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a change in pull request #35509: [SPARK-38201][K8S] Fix `KubernetesUtils#uploadFileToHadoopCompatibleFS` use passed in `delSrc` and `overwrite`

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on a change in pull request #35509:
URL: https://github.com/apache/spark/pull/35509#discussion_r806552246



##########
File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+        // Scenario 1: delSrc = false and overwrite = true, upload successful
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val firstUploadTime = fs.getFileStatus(dest).getModificationTime
+        // sleep 1s to ensure that the `ModificationTime` changes.
+        TimeUnit.SECONDS.sleep(1)
+        // Scenario 2: delSrc = false and overwrite = true,
+        // upload succeeded but `ModificationTime` changed
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val secondUploadTime = fs.getFileStatus(dest).getModificationTime
+        assert(firstUploadTime != secondUploadTime)
+
+        // Scenario 3: delSrc = false and overwrite = false,
+        // upload failed because dest exists
+        val message = intercept[SparkException] {
+          KubernetesUtils
+            .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, false))
+        }.getMessage
+        assert(message.contains("Error uploading file"))
+
+        TimeUnit.SECONDS.sleep(1)
+        // Scenario 4: delSrc = true and overwrite = true,
+        // upload succeeded, `ModificationTime` changed and src not exists.
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, true, true))
+        val thirdUploadTime = fs.getFileStatus(dest).getModificationTime
+        assert(secondUploadTime != thirdUploadTime)
+        assert(!fs.exists(src))
+      }

Review comment:
       Yes,  we don't test `delSrc=true` and `overwrite=false` now because `Scenario 4` already delete `src`, let me see how to add this scene.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on a change in pull request #35509: [SPARK-38201][K8S] Fix `KubernetesUtils#uploadFileToHadoopCompatibleFS` use passed in `delSrc` and `overwrite`

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on a change in pull request #35509:
URL: https://github.com/apache/spark/pull/35509#discussion_r806539744



##########
File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+        // Scenario 1: delSrc = false and overwrite = true, upload successful
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val firstUploadTime = fs.getFileStatus(dest).getModificationTime
+        // sleep 1s to ensure that the `ModificationTime` changes.
+        TimeUnit.SECONDS.sleep(1)
+        // Scenario 2: delSrc = false and overwrite = true,
+        // upload succeeded but `ModificationTime` changed
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val secondUploadTime = fs.getFileStatus(dest).getModificationTime
+        assert(firstUploadTime != secondUploadTime)
+
+        // Scenario 3: delSrc = false and overwrite = false,
+        // upload failed because dest exists
+        val message = intercept[SparkException] {
+          KubernetesUtils
+            .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, false))
+        }.getMessage
+        assert(message.contains("Error uploading file"))
+
+        TimeUnit.SECONDS.sleep(1)

Review comment:
       ditto. This is two high.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on a change in pull request #35509: [SPARK-38201][K8S] Fix `KubernetesUtils#uploadFileToHadoopCompatibleFS` use passed in `delSrc` and `overwrite`

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on a change in pull request #35509:
URL: https://github.com/apache/spark/pull/35509#discussion_r806540310



##########
File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+        // Scenario 1: delSrc = false and overwrite = true, upload successful
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val firstUploadTime = fs.getFileStatus(dest).getModificationTime
+        // sleep 1s to ensure that the `ModificationTime` changes.
+        TimeUnit.SECONDS.sleep(1)
+        // Scenario 2: delSrc = false and overwrite = true,
+        // upload succeeded but `ModificationTime` changed
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val secondUploadTime = fs.getFileStatus(dest).getModificationTime
+        assert(firstUploadTime != secondUploadTime)

Review comment:
       Shall we use a stronger `<` condition instead of weaker condition `!=`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on a change in pull request #35509: [SPARK-38201][K8S] Fix `KubernetesUtils#uploadFileToHadoopCompatibleFS` use passed in `delSrc` and `overwrite`

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on a change in pull request #35509:
URL: https://github.com/apache/spark/pull/35509#discussion_r806539279



##########
File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+        // Scenario 1: delSrc = false and overwrite = true, upload successful
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val firstUploadTime = fs.getFileStatus(dest).getModificationTime
+        // sleep 1s to ensure that the `ModificationTime` changes.
+        TimeUnit.SECONDS.sleep(1)

Review comment:
       `1s` is too high. Shall we minimize this delay?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a change in pull request #35509: [SPARK-38201][K8S] Fix `KubernetesUtils#uploadFileToHadoopCompatibleFS` use passed in `delSrc` and `overwrite`

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on a change in pull request #35509:
URL: https://github.com/apache/spark/pull/35509#discussion_r806545633



##########
File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,43 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different delSrc and overwrite") {
+    val upload = PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())

Review comment:
       [914e93c](https://github.com/apache/spark/pull/35509/commits/914e93c04a1162ab47e8842b2b6db9fb7e59699e) fix this




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a change in pull request #35509: [SPARK-38201][K8S] Fix `KubernetesUtils#uploadFileToHadoopCompatibleFS` use passed in `delSrc` and `overwrite`

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on a change in pull request #35509:
URL: https://github.com/apache/spark/pull/35509#discussion_r806546759



##########
File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+        // Scenario 1: delSrc = false and overwrite = true, upload successful
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val firstUploadTime = fs.getFileStatus(dest).getModificationTime
+        // sleep 1s to ensure that the `ModificationTime` changes.
+        TimeUnit.SECONDS.sleep(1)
+        // Scenario 2: delSrc = false and overwrite = true,
+        // upload succeeded but `ModificationTime` changed
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val secondUploadTime = fs.getFileStatus(dest).getModificationTime
+        assert(firstUploadTime != secondUploadTime)
+
+        // Scenario 3: delSrc = false and overwrite = false,
+        // upload failed because dest exists
+        val message = intercept[SparkException] {
+          KubernetesUtils
+            .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, false))
+        }.getMessage
+        assert(message.contains("Error uploading file"))
+
+        TimeUnit.SECONDS.sleep(1)
+        // Scenario 4: delSrc = true and overwrite = true,
+        // upload succeeded, `ModificationTime` changed and src not exists.
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, true, true))
+        val thirdUploadTime = fs.getFileStatus(dest).getModificationTime
+        assert(secondUploadTime != thirdUploadTime)

Review comment:
       done

##########
File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+        // Scenario 1: delSrc = false and overwrite = true, upload successful
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val firstUploadTime = fs.getFileStatus(dest).getModificationTime
+        // sleep 1s to ensure that the `ModificationTime` changes.
+        TimeUnit.SECONDS.sleep(1)
+        // Scenario 2: delSrc = false and overwrite = true,
+        // upload succeeded but `ModificationTime` changed
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val secondUploadTime = fs.getFileStatus(dest).getModificationTime
+        assert(firstUploadTime != secondUploadTime)

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang edited a comment on pull request #35509: [SPARK-38201][K8S] Fix `KubernetesUtils#uploadFileToHadoopCompatibleFS` use passed in `delSrc` and `overwrite`

Posted by GitBox <gi...@apache.org>.
LuciferYang edited a comment on pull request #35509:
URL: https://github.com/apache/spark/pull/35509#issuecomment-1040017309






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on pull request #35509: [SPARK-38201][K8S] Fix `uploadFileToHadoopCompatibleFS` to use `delSrc` and `overwrite` parameters

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on pull request #35509:
URL: https://github.com/apache/spark/pull/35509#issuecomment-1041074642


   Thank you, @LuciferYang and all. Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun closed pull request #35509: [SPARK-38201][K8S] Fix `uploadFileToHadoopCompatibleFS` to use `delSrc` and `overwrite` parameters

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun closed pull request #35509:
URL: https://github.com/apache/spark/pull/35509


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on a change in pull request #35509: [SPARK-38201][K8S] Fix `KubernetesUtils#uploadFileToHadoopCompatibleFS` use passed in `delSrc` and `overwrite`

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on a change in pull request #35509:
URL: https://github.com/apache/spark/pull/35509#discussion_r806536126



##########
File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+        // Scenario 1: delSrc = false and overwrite = true, upload successful
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val firstUploadTime = fs.getFileStatus(dest).getModificationTime
+        // sleep 1s to ensure that the `ModificationTime` changes.
+        TimeUnit.SECONDS.sleep(1)
+        // Scenario 2: delSrc = false and overwrite = true,
+        // upload succeeded but `ModificationTime` changed
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))

Review comment:
       ```scala
   -        KubernetesUtils
   -          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
   +        KubernetesUtils.invokePrivate(upload(src, dest, fs, false, true))
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on a change in pull request #35509: [SPARK-38201][K8S] Fix `KubernetesUtils#uploadFileToHadoopCompatibleFS` use passed in `delSrc` and `overwrite`

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on a change in pull request #35509:
URL: https://github.com/apache/spark/pull/35509#discussion_r806535077



##########
File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -17,13 +17,21 @@
 
 package org.apache.spark.deploy.k8s
 
+import java.io.File
+import java.nio.charset.Charset
+import java.util.concurrent.TimeUnit
+
 import scala.collection.JavaConverters._
 
 import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder}
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.scalatest.PrivateMethodTester
 
 import org.apache.spark.SparkFunSuite

Review comment:
       ```scala
   -import org.apache.spark.SparkFunSuite
   +import org.apache.spark.{SparkException, SparkFunSuite}
   ```

##########
File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))

Review comment:
       ```scala
   -    val uploadFileToHadoopCompatibleFSMethod =
   -      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
   +    val upload = PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
   ```

##########
File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException

Review comment:
       ```
   -        import org.apache.spark.SparkException
   ```

##########
File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+        // Scenario 1: delSrc = false and overwrite = true, upload successful
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))

Review comment:
       ```scala
   -        KubernetesUtils
   -          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
   +        KubernetesUtils.invokePrivate(upload(src, dest, fs, false, true))
   ```

##########
File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException

Review comment:
       ```scala
   -        import org.apache.spark.SparkException
   ```

##########
File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+        // Scenario 1: delSrc = false and overwrite = true, upload successful
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val firstUploadTime = fs.getFileStatus(dest).getModificationTime
+        // sleep 1s to ensure that the `ModificationTime` changes.
+        TimeUnit.SECONDS.sleep(1)
+        // Scenario 2: delSrc = false and overwrite = true,
+        // upload succeeded but `ModificationTime` changed
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))

Review comment:
       ```scala
   -        KubernetesUtils
   -          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
   +        KubernetesUtils.invokePrivate(upload(src, dest, fs, false, true))
   ```

##########
File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+        // Scenario 1: delSrc = false and overwrite = true, upload successful
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val firstUploadTime = fs.getFileStatus(dest).getModificationTime
+        // sleep 1s to ensure that the `ModificationTime` changes.
+        TimeUnit.SECONDS.sleep(1)
+        // Scenario 2: delSrc = false and overwrite = true,
+        // upload succeeded but `ModificationTime` changed
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val secondUploadTime = fs.getFileStatus(dest).getModificationTime
+        assert(firstUploadTime != secondUploadTime)
+
+        // Scenario 3: delSrc = false and overwrite = false,
+        // upload failed because dest exists
+        val message = intercept[SparkException] {
+          KubernetesUtils
+            .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, false))

Review comment:
       ```scala
   -          KubernetesUtils
   -            .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, false))
   +          KubernetesUtils.invokePrivate(upload(src, dest, fs, false, false))
   ```

##########
File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+        // Scenario 1: delSrc = false and overwrite = true, upload successful
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val firstUploadTime = fs.getFileStatus(dest).getModificationTime
+        // sleep 1s to ensure that the `ModificationTime` changes.
+        TimeUnit.SECONDS.sleep(1)
+        // Scenario 2: delSrc = false and overwrite = true,
+        // upload succeeded but `ModificationTime` changed
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val secondUploadTime = fs.getFileStatus(dest).getModificationTime
+        assert(firstUploadTime != secondUploadTime)
+
+        // Scenario 3: delSrc = false and overwrite = false,
+        // upload failed because dest exists
+        val message = intercept[SparkException] {
+          KubernetesUtils
+            .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, false))
+        }.getMessage
+        assert(message.contains("Error uploading file"))
+
+        TimeUnit.SECONDS.sleep(1)
+        // Scenario 4: delSrc = true and overwrite = true,
+        // upload succeeded, `ModificationTime` changed and src not exists.
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, true, true))

Review comment:
       ```scala
   -        KubernetesUtils
   -          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, true, true))
   +        KubernetesUtils.invokePrivate(upload(src, dest, fs, true, true))
   ```

##########
File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+        // Scenario 1: delSrc = false and overwrite = true, upload successful
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val firstUploadTime = fs.getFileStatus(dest).getModificationTime
+        // sleep 1s to ensure that the `ModificationTime` changes.
+        TimeUnit.SECONDS.sleep(1)

Review comment:
       `1s` is too high. Shall we minimize this delay?

##########
File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+        // Scenario 1: delSrc = false and overwrite = true, upload successful
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val firstUploadTime = fs.getFileStatus(dest).getModificationTime
+        // sleep 1s to ensure that the `ModificationTime` changes.
+        TimeUnit.SECONDS.sleep(1)
+        // Scenario 2: delSrc = false and overwrite = true,
+        // upload succeeded but `ModificationTime` changed
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val secondUploadTime = fs.getFileStatus(dest).getModificationTime
+        assert(firstUploadTime != secondUploadTime)
+
+        // Scenario 3: delSrc = false and overwrite = false,
+        // upload failed because dest exists
+        val message = intercept[SparkException] {
+          KubernetesUtils
+            .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, false))
+        }.getMessage
+        assert(message.contains("Error uploading file"))
+
+        TimeUnit.SECONDS.sleep(1)

Review comment:
       ditto. This is two high.

##########
File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+        // Scenario 1: delSrc = false and overwrite = true, upload successful
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val firstUploadTime = fs.getFileStatus(dest).getModificationTime
+        // sleep 1s to ensure that the `ModificationTime` changes.
+        TimeUnit.SECONDS.sleep(1)
+        // Scenario 2: delSrc = false and overwrite = true,
+        // upload succeeded but `ModificationTime` changed
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val secondUploadTime = fs.getFileStatus(dest).getModificationTime
+        assert(firstUploadTime != secondUploadTime)

Review comment:
       Shall we use a stronger `>` condition instead of weaker condition `!=`?

##########
File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+        // Scenario 1: delSrc = false and overwrite = true, upload successful
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val firstUploadTime = fs.getFileStatus(dest).getModificationTime
+        // sleep 1s to ensure that the `ModificationTime` changes.
+        TimeUnit.SECONDS.sleep(1)
+        // Scenario 2: delSrc = false and overwrite = true,
+        // upload succeeded but `ModificationTime` changed
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val secondUploadTime = fs.getFileStatus(dest).getModificationTime
+        assert(firstUploadTime != secondUploadTime)
+
+        // Scenario 3: delSrc = false and overwrite = false,
+        // upload failed because dest exists
+        val message = intercept[SparkException] {
+          KubernetesUtils
+            .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, false))
+        }.getMessage
+        assert(message.contains("Error uploading file"))
+
+        TimeUnit.SECONDS.sleep(1)
+        // Scenario 4: delSrc = true and overwrite = true,
+        // upload succeeded, `ModificationTime` changed and src not exists.
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, true, true))
+        val thirdUploadTime = fs.getFileStatus(dest).getModificationTime
+        assert(secondUploadTime != thirdUploadTime)

Review comment:
       ditto. `<` instead of `!=`.

##########
File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+        // Scenario 1: delSrc = false and overwrite = true, upload successful
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val firstUploadTime = fs.getFileStatus(dest).getModificationTime
+        // sleep 1s to ensure that the `ModificationTime` changes.
+        TimeUnit.SECONDS.sleep(1)
+        // Scenario 2: delSrc = false and overwrite = true,
+        // upload succeeded but `ModificationTime` changed
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val secondUploadTime = fs.getFileStatus(dest).getModificationTime
+        assert(firstUploadTime != secondUploadTime)

Review comment:
       Shall we use a stronger `<` condition instead of weaker condition `!=`?

##########
File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+        // Scenario 1: delSrc = false and overwrite = true, upload successful
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val firstUploadTime = fs.getFileStatus(dest).getModificationTime
+        // sleep 1s to ensure that the `ModificationTime` changes.
+        TimeUnit.SECONDS.sleep(1)
+        // Scenario 2: delSrc = false and overwrite = true,
+        // upload succeeded but `ModificationTime` changed
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val secondUploadTime = fs.getFileStatus(dest).getModificationTime
+        assert(firstUploadTime != secondUploadTime)
+
+        // Scenario 3: delSrc = false and overwrite = false,
+        // upload failed because dest exists
+        val message = intercept[SparkException] {
+          KubernetesUtils
+            .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, false))
+        }.getMessage
+        assert(message.contains("Error uploading file"))
+
+        TimeUnit.SECONDS.sleep(1)
+        // Scenario 4: delSrc = true and overwrite = true,
+        // upload succeeded, `ModificationTime` changed and src not exists.
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, true, true))
+        val thirdUploadTime = fs.getFileStatus(dest).getModificationTime
+        assert(secondUploadTime != thirdUploadTime)
+        assert(!fs.exists(src))
+      }

Review comment:
       So, we don't test `delSrc=true` and `overwrite=false`?

##########
File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,43 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different delSrc and overwrite") {
+    val upload = PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())

Review comment:
       If possible, shall we use `StandardCharsets.UTF_8` instead of `Charset.defaultCharset()`?

##########
File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+        // Scenario 1: delSrc = false and overwrite = true, upload successful
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val firstUploadTime = fs.getFileStatus(dest).getModificationTime
+        // sleep 1s to ensure that the `ModificationTime` changes.
+        TimeUnit.SECONDS.sleep(1)

Review comment:
       Since the unit of `getModificationTime` is a millisecond , can we try `TimeUnit.MILLISECONDS.sleep(100)`?

##########
File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+        // Scenario 1: delSrc = false and overwrite = true, upload successful
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val firstUploadTime = fs.getFileStatus(dest).getModificationTime
+        // sleep 1s to ensure that the `ModificationTime` changes.
+        TimeUnit.SECONDS.sleep(1)

Review comment:
       Oh, got it.

##########
File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+        // Scenario 1: delSrc = false and overwrite = true, upload successful
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val firstUploadTime = fs.getFileStatus(dest).getModificationTime
+        // sleep 1s to ensure that the `ModificationTime` changes.
+        TimeUnit.SECONDS.sleep(1)

Review comment:
       In that case, we had better check the actual content of file instead of the modification time.

##########
File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+        // Scenario 1: delSrc = false and overwrite = true, upload successful
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val firstUploadTime = fs.getFileStatus(dest).getModificationTime
+        // sleep 1s to ensure that the `ModificationTime` changes.
+        TimeUnit.SECONDS.sleep(1)

Review comment:
       Please use `FileUtils.readFileToString`.

##########
File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+        // Scenario 1: delSrc = false and overwrite = true, upload successful
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val firstUploadTime = fs.getFileStatus(dest).getModificationTime
+        // sleep 1s to ensure that the `ModificationTime` changes.
+        TimeUnit.SECONDS.sleep(1)

Review comment:
       When it comes to the test time, Apache Spark community is very careful and has been trying to reduce the test time again and again. `Sleep` is one of the typical pattern we want to avoid.

##########
File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +72,61 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different delSrc and overwrite") {
+
+    def checkUploadFailed(f: () => Unit): Unit = {
+      val message = intercept[SparkException] {
+        f
+      }.getMessage
+      assert(message.contains("Error uploading file"))
+    }
+
+    withTempDir { srcDir =>
+      withTempDir { destDir =>
+        val upload = PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+        val fileName = "test.txt"
+        val srcFile = new File(srcDir, fileName)
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+        // Write a new file, upload file with delSrc = false and overwrite = true.
+        // Upload successful and record the `fileLength`.
+        FileUtils.write(srcFile, "init-content", StandardCharsets.UTF_8)
+        KubernetesUtils.invokePrivate(upload(src, dest, fs, false, true))
+        val firstLength = fs.getFileStatus(dest).getLen
+
+        // Append the file, upload file with delSrc = false and overwrite = true.
+        // Upload succeeded but `fileLength` changed.
+        FileUtils.write(srcFile, "append-content", StandardCharsets.UTF_8, true)
+        KubernetesUtils.invokePrivate(upload(src, dest, fs, false, true))
+        val secondLength = fs.getFileStatus(dest).getLen
+        assert(firstLength < secondLength)
+
+        // Upload file with delSrc = false and overwrite = false.
+        // Upload failed because dest exists.
+        val message1 = intercept[SparkException] {
+          KubernetesUtils.invokePrivate(upload(src, dest, fs, false, false))
+        }.getMessage
+        assert(message1.contains("Error uploading file"))

Review comment:
       It seems that we need to check that the uploaded file is not changed here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a change in pull request #35509: [SPARK-38201][K8S] Fix `KubernetesUtils#uploadFileToHadoopCompatibleFS` use passed in `delSrc` and `overwrite`

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on a change in pull request #35509:
URL: https://github.com/apache/spark/pull/35509#discussion_r806550196



##########
File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+        // Scenario 1: delSrc = false and overwrite = true, upload successful
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val firstUploadTime = fs.getFileStatus(dest).getModificationTime
+        // sleep 1s to ensure that the `ModificationTime` changes.
+        TimeUnit.SECONDS.sleep(1)

Review comment:
       The result `fs.getFileStatus(dest).getModificationTime`  is `xxxx000` millis when `fs` is a `LocalFileSystem`, the last three digits are 0.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on a change in pull request #35509: [SPARK-38201][K8S] Fix `KubernetesUtils#uploadFileToHadoopCompatibleFS` use passed in `delSrc` and `overwrite`

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on a change in pull request #35509:
URL: https://github.com/apache/spark/pull/35509#discussion_r806611191



##########
File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +72,61 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different delSrc and overwrite") {
+
+    def checkUploadFailed(f: () => Unit): Unit = {
+      val message = intercept[SparkException] {
+        f
+      }.getMessage
+      assert(message.contains("Error uploading file"))
+    }
+
+    withTempDir { srcDir =>
+      withTempDir { destDir =>
+        val upload = PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+        val fileName = "test.txt"
+        val srcFile = new File(srcDir, fileName)
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+        // Write a new file, upload file with delSrc = false and overwrite = true.
+        // Upload successful and record the `fileLength`.
+        FileUtils.write(srcFile, "init-content", StandardCharsets.UTF_8)
+        KubernetesUtils.invokePrivate(upload(src, dest, fs, false, true))
+        val firstLength = fs.getFileStatus(dest).getLen
+
+        // Append the file, upload file with delSrc = false and overwrite = true.
+        // Upload succeeded but `fileLength` changed.
+        FileUtils.write(srcFile, "append-content", StandardCharsets.UTF_8, true)
+        KubernetesUtils.invokePrivate(upload(src, dest, fs, false, true))
+        val secondLength = fs.getFileStatus(dest).getLen
+        assert(firstLength < secondLength)
+
+        // Upload file with delSrc = false and overwrite = false.
+        // Upload failed because dest exists.
+        val message1 = intercept[SparkException] {
+          KubernetesUtils.invokePrivate(upload(src, dest, fs, false, false))
+        }.getMessage
+        assert(message1.contains("Error uploading file"))

Review comment:
       It seems that we need to check that the uploaded file is not changed here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on a change in pull request #35509: [SPARK-38201][K8S] Fix `uploadFileToHadoopCompatibleFS` to use `delSrc` and `overwrite` parameters

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on a change in pull request #35509:
URL: https://github.com/apache/spark/pull/35509#discussion_r807192879



##########
File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +72,57 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different delSrc and overwrite") {
+    withTempDir { srcDir =>
+      withTempDir { destDir =>
+        val upload = PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+        val fileName = "test.txt"
+        val srcFile = new File(srcDir, fileName)
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+
+        def checkUploadException(delSrc: Boolean, overwrite: Boolean): Unit = {
+          val message = intercept[SparkException] {
+            KubernetesUtils.invokePrivate(upload(src, dest, fs, delSrc, overwrite))
+          }.getMessage
+          assert(message.contains("Error uploading file"))
+        }
+
+        def appendFileAndUpload(content: String, delSrc: Boolean, overwrite: Boolean): Unit = {
+          FileUtils.write(srcFile, content, StandardCharsets.UTF_8, true)
+          KubernetesUtils.invokePrivate(upload(src, dest, fs, delSrc, overwrite))
+        }
+
+        // Write a new file, upload file with delSrc = false and overwrite = true.
+        // Upload successful and record the `fileLength`.
+        appendFileAndUpload("init-content", delSrc = false, overwrite = true)
+        val firstLength = fs.getFileStatus(dest).getLen
+
+        // Append the file, upload file with delSrc = false and overwrite = true.
+        // Upload succeeded but `fileLength` changed.
+        appendFileAndUpload("append-content", delSrc = false, overwrite = true)
+        val secondLength = fs.getFileStatus(dest).getLen
+        assert(firstLength < secondLength)
+
+        // Upload file with delSrc = false and overwrite = false.
+        // Upload failed because dest exists.
+        checkUploadException(delSrc = false, overwrite = false)

Review comment:
       The last comment is not addressed still.
   - https://github.com/apache/spark/pull/35509#discussion_r806611191

##########
File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +72,57 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different delSrc and overwrite") {
+    withTempDir { srcDir =>
+      withTempDir { destDir =>
+        val upload = PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+        val fileName = "test.txt"
+        val srcFile = new File(srcDir, fileName)
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+
+        def checkUploadException(delSrc: Boolean, overwrite: Boolean): Unit = {
+          val message = intercept[SparkException] {
+            KubernetesUtils.invokePrivate(upload(src, dest, fs, delSrc, overwrite))
+          }.getMessage
+          assert(message.contains("Error uploading file"))
+        }
+
+        def appendFileAndUpload(content: String, delSrc: Boolean, overwrite: Boolean): Unit = {
+          FileUtils.write(srcFile, content, StandardCharsets.UTF_8, true)
+          KubernetesUtils.invokePrivate(upload(src, dest, fs, delSrc, overwrite))
+        }
+
+        // Write a new file, upload file with delSrc = false and overwrite = true.
+        // Upload successful and record the `fileLength`.
+        appendFileAndUpload("init-content", delSrc = false, overwrite = true)
+        val firstLength = fs.getFileStatus(dest).getLen
+
+        // Append the file, upload file with delSrc = false and overwrite = true.
+        // Upload succeeded but `fileLength` changed.
+        appendFileAndUpload("append-content", delSrc = false, overwrite = true)
+        val secondLength = fs.getFileStatus(dest).getLen
+        assert(firstLength < secondLength)
+
+        // Upload file with delSrc = false and overwrite = false.
+        // Upload failed because dest exists.
+        checkUploadException(delSrc = false, overwrite = false)

Review comment:
       We need to check the file is identical after this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on pull request #35509: [SPARK-38201][K8S] Fix `uploadFileToHadoopCompatibleFS` to use `delSrc` and `overwrite` parameters

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on pull request #35509:
URL: https://github.com/apache/spark/pull/35509#issuecomment-1041076513


   Thank you very much for your guidance @dongjoon-hyun ~
   Thanks @martin-g @weixiuli 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a change in pull request #35509: [SPARK-38201][K8S] Fix `KubernetesUtils#uploadFileToHadoopCompatibleFS` use passed in `delSrc` and `overwrite`

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on a change in pull request #35509:
URL: https://github.com/apache/spark/pull/35509#discussion_r806552246



##########
File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+        // Scenario 1: delSrc = false and overwrite = true, upload successful
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val firstUploadTime = fs.getFileStatus(dest).getModificationTime
+        // sleep 1s to ensure that the `ModificationTime` changes.
+        TimeUnit.SECONDS.sleep(1)
+        // Scenario 2: delSrc = false and overwrite = true,
+        // upload succeeded but `ModificationTime` changed
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val secondUploadTime = fs.getFileStatus(dest).getModificationTime
+        assert(firstUploadTime != secondUploadTime)
+
+        // Scenario 3: delSrc = false and overwrite = false,
+        // upload failed because dest exists
+        val message = intercept[SparkException] {
+          KubernetesUtils
+            .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, false))
+        }.getMessage
+        assert(message.contains("Error uploading file"))
+
+        TimeUnit.SECONDS.sleep(1)
+        // Scenario 4: delSrc = true and overwrite = true,
+        // upload succeeded, `ModificationTime` changed and src not exists.
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, true, true))
+        val thirdUploadTime = fs.getFileStatus(dest).getModificationTime
+        assert(secondUploadTime != thirdUploadTime)
+        assert(!fs.exists(src))
+      }

Review comment:
       Yes,  we don't test `delSrc=true` and `overwrite=false` now because `Scenario 4` already delete `src`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on a change in pull request #35509: [SPARK-38201][K8S] Fix `KubernetesUtils#uploadFileToHadoopCompatibleFS` use passed in `delSrc` and `overwrite`

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on a change in pull request #35509:
URL: https://github.com/apache/spark/pull/35509#discussion_r806552130



##########
File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+        // Scenario 1: delSrc = false and overwrite = true, upload successful
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val firstUploadTime = fs.getFileStatus(dest).getModificationTime
+        // sleep 1s to ensure that the `ModificationTime` changes.
+        TimeUnit.SECONDS.sleep(1)

Review comment:
       In that case, we had better check the actual content of file instead of the modification time.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang edited a comment on pull request #35509: [SPARK-38201][K8S] Fix `KubernetesUtils#uploadFileToHadoopCompatibleFS` use passed in `delSrc` and `overwrite`

Posted by GitBox <gi...@apache.org>.
LuciferYang edited a comment on pull request #35509:
URL: https://github.com/apache/spark/pull/35509#issuecomment-1040017309






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on pull request #35509: [SPARK-38201][K8S] Fix `KubernetesUtils#uploadFileToHadoopCompatibleFS` use passed in `delSrc` and `overwrite`

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on pull request #35509:
URL: https://github.com/apache/spark/pull/35509#issuecomment-1039891455






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on a change in pull request #35509: [SPARK-38201][K8S] Fix `KubernetesUtils#uploadFileToHadoopCompatibleFS` use passed in `delSrc` and `overwrite`

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on a change in pull request #35509:
URL: https://github.com/apache/spark/pull/35509#discussion_r806540310



##########
File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+        // Scenario 1: delSrc = false and overwrite = true, upload successful
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val firstUploadTime = fs.getFileStatus(dest).getModificationTime
+        // sleep 1s to ensure that the `ModificationTime` changes.
+        TimeUnit.SECONDS.sleep(1)
+        // Scenario 2: delSrc = false and overwrite = true,
+        // upload succeeded but `ModificationTime` changed
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val secondUploadTime = fs.getFileStatus(dest).getModificationTime
+        assert(firstUploadTime != secondUploadTime)

Review comment:
       Shall we use a stronger `>` condition instead of weaker condition `!=`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a change in pull request #35509: [SPARK-38201][K8S] Fix `KubernetesUtils#uploadFileToHadoopCompatibleFS` use passed in `delSrc` and `overwrite`

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on a change in pull request #35509:
URL: https://github.com/apache/spark/pull/35509#discussion_r806545633



##########
File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,43 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different delSrc and overwrite") {
+    val upload = PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())

Review comment:
       [914e93c](https://github.com/apache/spark/pull/35509/commits/914e93c04a1162ab47e8842b2b6db9fb7e59699e) fix this

##########
File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+        // Scenario 1: delSrc = false and overwrite = true, upload successful
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val firstUploadTime = fs.getFileStatus(dest).getModificationTime
+        // sleep 1s to ensure that the `ModificationTime` changes.
+        TimeUnit.SECONDS.sleep(1)
+        // Scenario 2: delSrc = false and overwrite = true,
+        // upload succeeded but `ModificationTime` changed
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val secondUploadTime = fs.getFileStatus(dest).getModificationTime
+        assert(firstUploadTime != secondUploadTime)
+
+        // Scenario 3: delSrc = false and overwrite = false,
+        // upload failed because dest exists
+        val message = intercept[SparkException] {
+          KubernetesUtils
+            .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, false))
+        }.getMessage
+        assert(message.contains("Error uploading file"))
+
+        TimeUnit.SECONDS.sleep(1)
+        // Scenario 4: delSrc = true and overwrite = true,
+        // upload succeeded, `ModificationTime` changed and src not exists.
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, true, true))
+        val thirdUploadTime = fs.getFileStatus(dest).getModificationTime
+        assert(secondUploadTime != thirdUploadTime)

Review comment:
       done

##########
File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+        // Scenario 1: delSrc = false and overwrite = true, upload successful
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val firstUploadTime = fs.getFileStatus(dest).getModificationTime
+        // sleep 1s to ensure that the `ModificationTime` changes.
+        TimeUnit.SECONDS.sleep(1)
+        // Scenario 2: delSrc = false and overwrite = true,
+        // upload succeeded but `ModificationTime` changed
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val secondUploadTime = fs.getFileStatus(dest).getModificationTime
+        assert(firstUploadTime != secondUploadTime)

Review comment:
       done

##########
File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+        // Scenario 1: delSrc = false and overwrite = true, upload successful
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val firstUploadTime = fs.getFileStatus(dest).getModificationTime
+        // sleep 1s to ensure that the `ModificationTime` changes.
+        TimeUnit.SECONDS.sleep(1)

Review comment:
       The result `fs.getFileStatus(dest).getModificationTime`  is `xxxx000` millis when `fs` is a `LocalFileSystem`, the last three digits are 0.
   

##########
File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+        // Scenario 1: delSrc = false and overwrite = true, upload successful
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val firstUploadTime = fs.getFileStatus(dest).getModificationTime
+        // sleep 1s to ensure that the `ModificationTime` changes.
+        TimeUnit.SECONDS.sleep(1)
+        // Scenario 2: delSrc = false and overwrite = true,
+        // upload succeeded but `ModificationTime` changed
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val secondUploadTime = fs.getFileStatus(dest).getModificationTime
+        assert(firstUploadTime != secondUploadTime)
+
+        // Scenario 3: delSrc = false and overwrite = false,
+        // upload failed because dest exists
+        val message = intercept[SparkException] {
+          KubernetesUtils
+            .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, false))
+        }.getMessage
+        assert(message.contains("Error uploading file"))
+
+        TimeUnit.SECONDS.sleep(1)
+        // Scenario 4: delSrc = true and overwrite = true,
+        // upload succeeded, `ModificationTime` changed and src not exists.
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, true, true))
+        val thirdUploadTime = fs.getFileStatus(dest).getModificationTime
+        assert(secondUploadTime != thirdUploadTime)
+        assert(!fs.exists(src))
+      }

Review comment:
       Yes,  we don't test `delSrc=true` and `overwrite=false` now because `Scenario 4` already delete `src`

##########
File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+        // Scenario 1: delSrc = false and overwrite = true, upload successful
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val firstUploadTime = fs.getFileStatus(dest).getModificationTime
+        // sleep 1s to ensure that the `ModificationTime` changes.
+        TimeUnit.SECONDS.sleep(1)
+        // Scenario 2: delSrc = false and overwrite = true,
+        // upload succeeded but `ModificationTime` changed
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val secondUploadTime = fs.getFileStatus(dest).getModificationTime
+        assert(firstUploadTime != secondUploadTime)
+
+        // Scenario 3: delSrc = false and overwrite = false,
+        // upload failed because dest exists
+        val message = intercept[SparkException] {
+          KubernetesUtils
+            .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, false))
+        }.getMessage
+        assert(message.contains("Error uploading file"))
+
+        TimeUnit.SECONDS.sleep(1)
+        // Scenario 4: delSrc = true and overwrite = true,
+        // upload succeeded, `ModificationTime` changed and src not exists.
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, true, true))
+        val thirdUploadTime = fs.getFileStatus(dest).getModificationTime
+        assert(secondUploadTime != thirdUploadTime)
+        assert(!fs.exists(src))
+      }

Review comment:
       Yes,  we don't test `delSrc=true` and `overwrite=false` now because `Scenario 4` already delete `src`, let me see how to add this scene.
   
   

##########
File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+        // Scenario 1: delSrc = false and overwrite = true, upload successful
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, false, true))
+        val firstUploadTime = fs.getFileStatus(dest).getModificationTime
+        // sleep 1s to ensure that the `ModificationTime` changes.
+        TimeUnit.SECONDS.sleep(1)

Review comment:
       OK




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on a change in pull request #35509: [SPARK-38201][K8S] Fix `KubernetesUtils#uploadFileToHadoopCompatibleFS` use passed in `delSrc` and `overwrite`

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on a change in pull request #35509:
URL: https://github.com/apache/spark/pull/35509#discussion_r806535509



##########
File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException

Review comment:
       ```
   -        import org.apache.spark.SparkException
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org