You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/02/08 18:23:15 UTC

[spark] branch master updated: [SPARK-26389][SS] Add force delete temp checkpoint configuration

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 701b06a  [SPARK-26389][SS] Add force delete temp checkpoint configuration
701b06a is described below

commit 701b06a7e2e76e5d9ed020c62e0ed3464fa2818b
Author: Gabor Somogyi <ga...@gmail.com>
AuthorDate: Fri Feb 8 10:22:51 2019 -0800

    [SPARK-26389][SS] Add force delete temp checkpoint configuration
    
    ## What changes were proposed in this pull request?
    
    Not all users wants to keep temporary checkpoint directories. Additionally hard to restore from it.
    
    In this PR I've added a force delete flag which is default `false`. Additionally not clear for users when temporary checkpoint directory deleted so added log messages to explain this a bit more.
    
    ## How was this patch tested?
    
    Existing + additional unit tests.
    
    Closes #23732 from gaborgsomogyi/SPARK-26389.
    
    Authored-by: Gabor Somogyi <ga...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../org/apache/spark/sql/internal/SQLConf.scala    |  6 ++++
 .../sql/execution/streaming/StreamExecution.scala  | 11 ++++++--
 .../sql/streaming/StreamingQueryManager.scala      |  8 ++++--
 .../test/DataStreamReaderWriterSuite.scala         | 32 +++++++++++++++++++++-
 4 files changed, 51 insertions(+), 6 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 11e1a5e..d285e00 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -907,6 +907,12 @@ object SQLConf {
     .stringConf
     .createOptional
 
+  val FORCE_DELETE_TEMP_CHECKPOINT_LOCATION =
+    buildConf("spark.sql.streaming.forceDeleteTempCheckpointLocation")
+      .doc("When true, enable temporary checkpoint locations force delete.")
+      .booleanConf
+      .createWithDefault(false)
+
   val MIN_BATCHES_TO_RETAIN = buildConf("spark.sql.streaming.minBatchesToRetain")
     .internal()
     .doc("The minimum number of batches that must be retained and made recoverable.")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 90f7b47..dc9ed80 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -55,7 +55,8 @@ case object RECONFIGURING extends State
  * and the results are committed transactionally to the given [[Sink]].
  *
  * @param deleteCheckpointOnStop whether to delete the checkpoint if the query is stopped without
- *                               errors
+ *                               errors. Checkpoint deletion can be forced with the appropriate
+ *                               Spark configuration.
  */
 abstract class StreamExecution(
     override val sparkSession: SparkSession,
@@ -92,6 +93,7 @@ abstract class StreamExecution(
     fs.mkdirs(checkpointPath)
     checkpointPath.makeQualified(fs.getUri, fs.getWorkingDirectory).toUri.toString
   }
+  logInfo(s"Checkpoint root $checkpointRoot resolved to $resolvedCheckpointRoot.")
 
   def logicalPlan: LogicalPlan
 
@@ -335,10 +337,13 @@ abstract class StreamExecution(
         postEvent(
           new QueryTerminatedEvent(id, runId, exception.map(_.cause).map(Utils.exceptionString)))
 
-        // Delete the temp checkpoint only when the query didn't fail
-        if (deleteCheckpointOnStop && exception.isEmpty) {
+        // Delete the temp checkpoint when either force delete enabled or the query didn't fail
+        if (deleteCheckpointOnStop &&
+            (sparkSession.sessionState.conf
+              .getConf(SQLConf.FORCE_DELETE_TEMP_CHECKPOINT_LOCATION) || exception.isEmpty)) {
           val checkpointPath = new Path(resolvedCheckpointRoot)
           try {
+            logInfo(s"Deleting checkpoint $checkpointPath.")
             val fs = checkpointPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
             fs.delete(checkpointPath, true)
           } catch {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index 881cd96..cb9ca4c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -221,9 +221,13 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
       }
     }.getOrElse {
       if (useTempCheckpointLocation) {
-        // Delete the temp checkpoint when a query is being stopped without errors.
         deleteCheckpointOnStop = true
-        Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath
+        val tempDir = Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath
+        logWarning("Temporary checkpoint location created which is deleted normally when" +
+          s" the query didn't fail: $tempDir. If it's required to delete it under any" +
+          s" circumstances, please set ${SQLConf.FORCE_DELETE_TEMP_CHECKPOINT_LOCATION.key} to" +
+          s" true. Important to know deleting temp checkpoint folder is best effort.")
+        tempDir
       } else {
         throw new AnalysisException(
           "checkpointLocation must be specified either " +
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
index 74ea0bf..c3c7dcb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
@@ -614,6 +614,21 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
     }
   }
 
+  test("configured checkpoint dir should not be deleted if a query is stopped without errors and" +
+    " force temp checkpoint deletion enabled") {
+    import testImplicits._
+    withTempDir { checkpointPath =>
+      withSQLConf(SQLConf.CHECKPOINT_LOCATION.key -> checkpointPath.getAbsolutePath,
+        SQLConf.FORCE_DELETE_TEMP_CHECKPOINT_LOCATION.key -> "true") {
+        val ds = MemoryStream[Int].toDS
+        val query = ds.writeStream.format("console").start()
+        assert(checkpointPath.exists())
+        query.stop()
+        assert(checkpointPath.exists())
+      }
+    }
+  }
+
   test("temp checkpoint dir should be deleted if a query is stopped without errors") {
     import testImplicits._
     val query = MemoryStream[Int].toDS.writeStream.format("console").start()
@@ -627,6 +642,17 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
   }
 
   testQuietly("temp checkpoint dir should not be deleted if a query is stopped with an error") {
+    testTempCheckpointWithFailedQuery(false)
+  }
+
+  testQuietly("temp checkpoint should be deleted if a query is stopped with an error and force" +
+    " temp checkpoint deletion enabled") {
+    withSQLConf(SQLConf.FORCE_DELETE_TEMP_CHECKPOINT_LOCATION.key -> "true") {
+      testTempCheckpointWithFailedQuery(true)
+    }
+  }
+
+  private def testTempCheckpointWithFailedQuery(checkpointMustBeDeleted: Boolean): Unit = {
     import testImplicits._
     val input = MemoryStream[Int]
     val query = input.toDS.map(_ / 0).writeStream.format("console").start()
@@ -638,7 +664,11 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
     intercept[StreamingQueryException] {
       query.awaitTermination()
     }
-    assert(fs.exists(checkpointDir))
+    if (!checkpointMustBeDeleted) {
+      assert(fs.exists(checkpointDir))
+    } else {
+      assert(!fs.exists(checkpointDir))
+    }
   }
 
   test("SPARK-20431: Specify a schema by using a DDL-formatted string") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org