You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/04/19 16:12:25 UTC

[spark] branch master updated: [SPARK-31484][CORE] Add stage attempt number to temp checkpoint filename to avoid file already existing exception

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e3ac56c  [SPARK-31484][CORE] Add stage attempt number to temp checkpoint filename to avoid file already existing exception
e3ac56c is described below

commit e3ac56c8f4491c09af0d3e76cb91ad2bf898c368
Author: Liang-Chi Hsieh <li...@uber.com>
AuthorDate: Sun Apr 19 09:11:17 2020 -0700

    [SPARK-31484][CORE] Add stage attempt number to temp checkpoint filename to avoid file already existing exception
    
    ### What changes were proposed in this pull request?
    
    To avoid file already existing exception when creating checkpoint file, this PR proposes to add stage attempt number to temporary checkpoint file.
    
    ### Why are the changes needed?
    
    On our production clusters, we have seen checkpointing failure. The failed stage can possibly leave partial written checkpoint file, the task of retried stage to write checkpoint file could fail due to`FileAlreadyExistsException` when creating the same file, like
    ```
    org.apache.hadoop.fs.FileAlreadyExistsException: /path_to_checkpoint/rdd-114/.part-03154-attempt-0 for client xxx.xxx.xxx.xxx already exists
    org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.startFile(FSDirWriteFileOp.java:359)
    	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2353)
    	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2273)
    	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:728)
    	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:413)
    	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
    	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:447)
    	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:989)
    	at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:851)
    	at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:794)
    	at java.security.AccessController.doPrivileged(Native Method)
    	at javax.security.auth.Subject.doAs(Subject.java:422)
    	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
    	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2490)
    
    	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    	at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:121)
    	at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:88)
    	at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:270)
    	at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1263)
    	at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1205)
    	at org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:473)
    	at org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:470)
    	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    	at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:470)
    	at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:411)
    	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929)
    	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910)
    	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:872)
    	at org.apache.spark.rdd.ReliableCheckpointRDD$.writePartitionToCheckpointFile(ReliableCheckpointRDD.scala:204)
    ```
    
    ### Does this PR introduce any user-facing change?
    
    Yes. Users won't see checkpoint file already existing exception after this PR.
    
    ### How was this patch tested?
    
    Add unit test.
    
    Closes #28255 from viirya/delete-temp-checkpoint.
    
    Authored-by: Liang-Chi Hsieh <li...@uber.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../apache/spark/rdd/ReliableCheckpointRDD.scala   | 15 ++++++++-----
 .../scala/org/apache/spark/CheckpointSuite.scala   | 26 ++++++++++++++++++++++
 2 files changed, 36 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
index a5c07c0..b29b56b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
@@ -199,8 +199,8 @@ private[spark] object ReliableCheckpointRDD extends Logging {
 
     val finalOutputName = ReliableCheckpointRDD.checkpointFileName(ctx.partitionId())
     val finalOutputPath = new Path(outputDir, finalOutputName)
-    val tempOutputPath =
-      new Path(outputDir, s".$finalOutputName-attempt-${ctx.attemptNumber()}")
+    val tempOutputPath = new Path(outputDir,
+      s".$finalOutputName-attempt-${ctx.stageAttemptNumber()}-${ctx.attemptNumber()}")
 
     val bufferSize = env.conf.get(BUFFER_SIZE)
 
@@ -218,11 +218,16 @@ private[spark] object ReliableCheckpointRDD extends Logging {
     }
     val serializer = env.serializer.newInstance()
     val serializeStream = serializer.serializeStream(fileOutputStream)
-    Utils.tryWithSafeFinally {
+    Utils.tryWithSafeFinallyAndFailureCallbacks {
       serializeStream.writeAll(iterator)
-    } {
+    } (catchBlock = {
+      val deleted = fs.delete(tempOutputPath, false)
+      if (!deleted) {
+        logInfo(s"Failed to delete tempOutputPath $tempOutputPath.")
+      }
+    }, finallyBlock = {
       serializeStream.close()
-    }
+    })
 
     if (!fs.rename(tempOutputPath, finalOutputPath)) {
       if (!fs.exists(finalOutputPath)) {
diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
index 6a108a5..a69381d 100644
--- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
@@ -28,6 +28,7 @@ import org.apache.spark.internal.config.CACHE_CHECKPOINT_PREFERRED_LOCS_EXPIRE_T
 import org.apache.spark.internal.config.UI._
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.rdd._
+import org.apache.spark.shuffle.FetchFailedException
 import org.apache.spark.storage.{BlockId, StorageLevel, TestBlockId}
 import org.apache.spark.util.Utils
 
@@ -642,4 +643,29 @@ class CheckpointStorageSuite extends SparkFunSuite with LocalSparkContext {
       assert(preferredLoc == checkpointedRDD.cachedPreferredLocations.get(partiton))
     }
   }
+
+  test("SPARK-31484: checkpoint should not fail in retry") {
+    withTempDir { checkpointDir =>
+      val conf = new SparkConf()
+        .set(UI_ENABLED.key, "false")
+      sc = new SparkContext("local[1]", "test", conf)
+      sc.setCheckpointDir(checkpointDir.toString)
+      val rdd = sc.makeRDD(1 to 200, numSlices = 4).repartition(1).mapPartitions { iter =>
+        iter.map { i =>
+          if (i > 100 && TaskContext.get().stageAttemptNumber() == 0) {
+            // throw new SparkException("Make first attemp failed.")
+            // Throw FetchFailedException to explicitly trigger stage resubmission.
+            // A normal exception will only trigger task resubmission in the same stage.
+            throw new FetchFailedException(null, 0, 0L, 0, 0, "Fake")
+          } else {
+            i
+          }
+        }
+      }
+      rdd.checkpoint()
+      assert(rdd.collect().toSeq === (1 to 200))
+      // Verify that RDD is checkpointed
+      assert(rdd.firstParent.isInstanceOf[ReliableCheckpointRDD[_]])
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org