You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/02/08 18:23:15 UTC
[spark] branch master updated: [SPARK-26389][SS] Add force delete
temp checkpoint configuration
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 701b06a [SPARK-26389][SS] Add force delete temp checkpoint configuration
701b06a is described below
commit 701b06a7e2e76e5d9ed020c62e0ed3464fa2818b
Author: Gabor Somogyi <ga...@gmail.com>
AuthorDate: Fri Feb 8 10:22:51 2019 -0800
[SPARK-26389][SS] Add force delete temp checkpoint configuration
## What changes were proposed in this pull request?
Not all users wants to keep temporary checkpoint directories. Additionally hard to restore from it.
In this PR I've added a force delete flag which is default `false`. Additionally not clear for users when temporary checkpoint directory deleted so added log messages to explain this a bit more.
## How was this patch tested?
Existing + additional unit tests.
Closes #23732 from gaborgsomogyi/SPARK-26389.
Authored-by: Gabor Somogyi <ga...@gmail.com>
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
.../org/apache/spark/sql/internal/SQLConf.scala | 6 ++++
.../sql/execution/streaming/StreamExecution.scala | 11 ++++++--
.../sql/streaming/StreamingQueryManager.scala | 8 ++++--
.../test/DataStreamReaderWriterSuite.scala | 32 +++++++++++++++++++++-
4 files changed, 51 insertions(+), 6 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 11e1a5e..d285e00 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -907,6 +907,12 @@ object SQLConf {
.stringConf
.createOptional
+ val FORCE_DELETE_TEMP_CHECKPOINT_LOCATION =
+ buildConf("spark.sql.streaming.forceDeleteTempCheckpointLocation")
+ .doc("When true, enable temporary checkpoint locations force delete.")
+ .booleanConf
+ .createWithDefault(false)
+
val MIN_BATCHES_TO_RETAIN = buildConf("spark.sql.streaming.minBatchesToRetain")
.internal()
.doc("The minimum number of batches that must be retained and made recoverable.")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 90f7b47..dc9ed80 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -55,7 +55,8 @@ case object RECONFIGURING extends State
* and the results are committed transactionally to the given [[Sink]].
*
* @param deleteCheckpointOnStop whether to delete the checkpoint if the query is stopped without
- * errors
+ * errors. Checkpoint deletion can be forced with the appropriate
+ * Spark configuration.
*/
abstract class StreamExecution(
override val sparkSession: SparkSession,
@@ -92,6 +93,7 @@ abstract class StreamExecution(
fs.mkdirs(checkpointPath)
checkpointPath.makeQualified(fs.getUri, fs.getWorkingDirectory).toUri.toString
}
+ logInfo(s"Checkpoint root $checkpointRoot resolved to $resolvedCheckpointRoot.")
def logicalPlan: LogicalPlan
@@ -335,10 +337,13 @@ abstract class StreamExecution(
postEvent(
new QueryTerminatedEvent(id, runId, exception.map(_.cause).map(Utils.exceptionString)))
- // Delete the temp checkpoint only when the query didn't fail
- if (deleteCheckpointOnStop && exception.isEmpty) {
+ // Delete the temp checkpoint when either force delete enabled or the query didn't fail
+ if (deleteCheckpointOnStop &&
+ (sparkSession.sessionState.conf
+ .getConf(SQLConf.FORCE_DELETE_TEMP_CHECKPOINT_LOCATION) || exception.isEmpty)) {
val checkpointPath = new Path(resolvedCheckpointRoot)
try {
+ logInfo(s"Deleting checkpoint $checkpointPath.")
val fs = checkpointPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
fs.delete(checkpointPath, true)
} catch {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index 881cd96..cb9ca4c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -221,9 +221,13 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
}
}.getOrElse {
if (useTempCheckpointLocation) {
- // Delete the temp checkpoint when a query is being stopped without errors.
deleteCheckpointOnStop = true
- Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath
+ val tempDir = Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath
+ logWarning("Temporary checkpoint location created which is deleted normally when" +
+ s" the query didn't fail: $tempDir. If it's required to delete it under any" +
+ s" circumstances, please set ${SQLConf.FORCE_DELETE_TEMP_CHECKPOINT_LOCATION.key} to" +
+ s" true. Important to know deleting temp checkpoint folder is best effort.")
+ tempDir
} else {
throw new AnalysisException(
"checkpointLocation must be specified either " +
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
index 74ea0bf..c3c7dcb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
@@ -614,6 +614,21 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
}
}
+ test("configured checkpoint dir should not be deleted if a query is stopped without errors and" +
+ " force temp checkpoint deletion enabled") {
+ import testImplicits._
+ withTempDir { checkpointPath =>
+ withSQLConf(SQLConf.CHECKPOINT_LOCATION.key -> checkpointPath.getAbsolutePath,
+ SQLConf.FORCE_DELETE_TEMP_CHECKPOINT_LOCATION.key -> "true") {
+ val ds = MemoryStream[Int].toDS
+ val query = ds.writeStream.format("console").start()
+ assert(checkpointPath.exists())
+ query.stop()
+ assert(checkpointPath.exists())
+ }
+ }
+ }
+
test("temp checkpoint dir should be deleted if a query is stopped without errors") {
import testImplicits._
val query = MemoryStream[Int].toDS.writeStream.format("console").start()
@@ -627,6 +642,17 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
}
testQuietly("temp checkpoint dir should not be deleted if a query is stopped with an error") {
+ testTempCheckpointWithFailedQuery(false)
+ }
+
+ testQuietly("temp checkpoint should be deleted if a query is stopped with an error and force" +
+ " temp checkpoint deletion enabled") {
+ withSQLConf(SQLConf.FORCE_DELETE_TEMP_CHECKPOINT_LOCATION.key -> "true") {
+ testTempCheckpointWithFailedQuery(true)
+ }
+ }
+
+ private def testTempCheckpointWithFailedQuery(checkpointMustBeDeleted: Boolean): Unit = {
import testImplicits._
val input = MemoryStream[Int]
val query = input.toDS.map(_ / 0).writeStream.format("console").start()
@@ -638,7 +664,11 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
intercept[StreamingQueryException] {
query.awaitTermination()
}
- assert(fs.exists(checkpointDir))
+ if (!checkpointMustBeDeleted) {
+ assert(fs.exists(checkpointDir))
+ } else {
+ assert(!fs.exists(checkpointDir))
+ }
}
test("SPARK-20431: Specify a schema by using a DDL-formatted string") {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org