You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2019/09/27 19:36:45 UTC

[spark] branch master updated: [SPARK-27254][SS] Cleanup complete but invalid output files in ManifestFileCommitProtocol if job is aborted

This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d72f398  [SPARK-27254][SS] Cleanup complete but invalid output files in ManifestFileCommitProtocol if job is aborted
d72f398 is described below

commit d72f39897b00d0bbd7a4db9de281a1256fcf908d
Author: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
AuthorDate: Fri Sep 27 12:35:26 2019 -0700

    [SPARK-27254][SS] Cleanup complete but invalid output files in ManifestFileCommitProtocol if job is aborted
    
    ## What changes were proposed in this pull request?
    
    SPARK-27210 enables ManifestFileCommitProtocol to clean up incomplete output files in task level if task is aborted.
    
    This patch extends the area of cleaning up, proposes ManifestFileCommitProtocol to clean up complete but invalid output files in job level if job aborts. Please note that this works as 'best-effort', not kind of guarantee, as we have in HadoopMapReduceCommitProtocol.
    
    ## How was this patch tested?
    
    Added UT.
    
    Closes #24186 from HeartSaVioR/SPARK-27254.
    
    Lead-authored-by: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
    Co-authored-by: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
    Signed-off-by: Shixiong Zhu <zs...@gmail.com>
---
 .../streaming/ManifestFileCommitProtocol.scala     | 37 ++++++++++-
 .../spark/sql/streaming/FileStreamSinkSuite.scala  | 74 ++++++++++++++++++++++
 2 files changed, 109 insertions(+), 2 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
index 916bd2d..f6cc811 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import java.io.IOException
 import java.util.UUID
 
 import scala.collection.mutable.ArrayBuffer
@@ -43,6 +44,8 @@ class ManifestFileCommitProtocol(jobId: String, path: String)
   @transient private var fileLog: FileStreamSinkLog = _
   private var batchId: Long = _
 
+  @transient private var pendingCommitFiles: ArrayBuffer[Path] = _
+
   /**
    * Sets up the manifest log output and the batch id for this job.
    * Must be called before any other function.
@@ -54,13 +57,21 @@ class ManifestFileCommitProtocol(jobId: String, path: String)
 
   override def setupJob(jobContext: JobContext): Unit = {
     require(fileLog != null, "setupManifestOptions must be called before this function")
-    // Do nothing
+    pendingCommitFiles = new ArrayBuffer[Path]
   }
 
   override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = {
     require(fileLog != null, "setupManifestOptions must be called before this function")
     val fileStatuses = taskCommits.flatMap(_.obj.asInstanceOf[Seq[SinkFileStatus]]).toArray
 
+    // We shouldn't remove the files if they're written to the metadata:
+    // `fileLog.add(batchId, fileStatuses)` could fail AFTER writing files to the metadata
+    // as well as there could be race
+    // so for the safety we clean up the list before calling anything incurs exception.
+    // The case is uncommon and we do best effort instead of guarantee, so the simplicity of
+    // logic here would be OK, and safe for dealing with unexpected situations.
+    pendingCommitFiles.clear()
+
     if (fileLog.add(batchId, fileStatuses)) {
       logInfo(s"Committed batch $batchId")
     } else {
@@ -70,7 +81,29 @@ class ManifestFileCommitProtocol(jobId: String, path: String)
 
   override def abortJob(jobContext: JobContext): Unit = {
     require(fileLog != null, "setupManifestOptions must be called before this function")
-    // Do nothing
+    // Best effort cleanup of complete files from failed job.
+    // Since the file has UUID in its filename, we are safe to try deleting them
+    // as the file will not conflict with file with another attempt on the same task.
+    if (pendingCommitFiles.nonEmpty) {
+      pendingCommitFiles.foreach { path =>
+        try {
+          val fs = path.getFileSystem(jobContext.getConfiguration)
+          // this is to make sure the file can be seen from driver as well
+          if (fs.exists(path)) {
+            fs.delete(path, false)
+          }
+        } catch {
+          case e: IOException =>
+            logWarning(s"Fail to remove temporary file $path, continue removing next.", e)
+        }
+      }
+      pendingCommitFiles.clear()
+    }
+  }
+
+  override def onTaskCommit(taskCommit: TaskCommitMessage): Unit = {
+    pendingCommitFiles ++= taskCommit.obj.asInstanceOf[Seq[SinkFileStatus]]
+      .map(_.toFileStatus.getPath)
   }
 
   override def setupTask(taskContext: TaskAttemptContext): Unit = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index 7d343bb..edeb416 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -22,10 +22,13 @@ import java.nio.file.Files
 import java.util.Locale
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
 
 import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.JobContext
 
 import org.apache.spark.SparkConf
+import org.apache.spark.internal.io.FileCommitProtocol
 import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
 import org.apache.spark.sql.{AnalysisException, DataFrame}
 import org.apache.spark.sql.execution.DataSourceScanExec
@@ -473,6 +476,77 @@ abstract class FileStreamSinkSuite extends StreamTest {
       assert(outputFiles.toList.isEmpty, "Incomplete files should be cleaned up.")
     }
   }
+
+  testQuietly("cleanup complete but invalid output for aborted job") {
+    withSQLConf(("spark.sql.streaming.commitProtocolClass",
+      classOf[PendingCommitFilesTrackingManifestFileCommitProtocol].getCanonicalName)) {
+      withTempDir { tempDir =>
+        val checkpointDir = new File(tempDir, "chk")
+        val outputDir = new File(tempDir, "output @#output")
+        val inputData = MemoryStream[Int]
+        inputData.addData(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
+        val q = inputData.toDS()
+          .repartition(10)
+          .map { value =>
+            // we intend task failure after some tasks succeeds
+            if (value == 5) {
+              // put some delay to let other task commits before this task fails
+              Thread.sleep(100)
+              value / 0
+            } else {
+              value
+            }
+          }
+          .writeStream
+          .option("checkpointLocation", checkpointDir.getCanonicalPath)
+          .format("parquet")
+          .start(outputDir.getCanonicalPath)
+
+        intercept[StreamingQueryException] {
+          try {
+            q.processAllAvailable()
+          } finally {
+            q.stop()
+          }
+        }
+
+        import PendingCommitFilesTrackingManifestFileCommitProtocol._
+        val outputFileNames = Files.walk(outputDir.toPath).iterator().asScala
+          .filter(_.toString.endsWith(".parquet"))
+          .map(_.getFileName.toString)
+          .toSet
+        val trackingFileNames = tracking.map(new Path(_).getName).toSet
+
+        // there would be possible to have race condition:
+        // - some tasks complete while abortJob is being called
+        // we can't delete complete files for these tasks (it's OK since this is a best effort)
+        assert(outputFileNames.intersect(trackingFileNames).isEmpty,
+          "abortJob should clean up files reported as successful.")
+      }
+    }
+  }
+}
+
+object PendingCommitFilesTrackingManifestFileCommitProtocol {
+  val tracking: ArrayBuffer[String] = new ArrayBuffer[String]()
+
+  def cleanPendingCommitFiles(): Unit = tracking.clear()
+  def addPendingCommitFiles(paths: Seq[String]): Unit = tracking ++= paths
+}
+
+class PendingCommitFilesTrackingManifestFileCommitProtocol(jobId: String, path: String)
+  extends ManifestFileCommitProtocol(jobId, path) {
+  import PendingCommitFilesTrackingManifestFileCommitProtocol._
+
+  override def setupJob(jobContext: JobContext): Unit = {
+    super.setupJob(jobContext)
+    cleanPendingCommitFiles()
+  }
+
+  override def onTaskCommit(taskCommit: FileCommitProtocol.TaskCommitMessage): Unit = {
+    super.onTaskCommit(taskCommit)
+    addPendingCommitFiles(taskCommit.obj.asInstanceOf[Seq[SinkFileStatus]].map(_.path))
+  }
 }
 
 class FileStreamSinkV1Suite extends FileStreamSinkSuite {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org