You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by lv...@apache.org on 2022/11/04 08:59:43 UTC
[incubator-streampark] branch dev updated: [Bug] flinkjob restore from savepoint bug fixed (#1962)
This is an automated email from the ASF dual-hosted git repository.
lvshaokang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 9223bcfd9 [Bug] flinkjob restore from savepoint bug fixed (#1962)
9223bcfd9 is described below
commit 9223bcfd9170f6a91ca088b99c00daefd5626458
Author: benjobs <be...@apache.org>
AuthorDate: Fri Nov 4 16:59:37 2022 +0800
[Bug] flinkjob restore from savepoint bug fixed (#1962)
Fixed flinkjob restore from savepoint bug.
---
.../apache/streampark/flink/submit/bean/SubmitRequest.scala | 9 ++++++++-
.../streampark/flink/submit/trait/FlinkSubmitTrait.scala | 11 +++++++----
2 files changed, 15 insertions(+), 5 deletions(-)
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
index ca79b68dc..d25311146 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
@@ -25,7 +25,7 @@ import org.apache.streampark.common.enums._
import org.apache.streampark.common.util.{DeflaterUtils, FlinkUtils, HdfsUtils, PropertiesUtils}
import org.apache.streampark.flink.packer.pipeline.{BuildResult, ShadedBuildResponse}
import org.apache.commons.io.FileUtils
-import org.apache.flink.runtime.jobgraph.SavepointConfigOptions
+import org.apache.flink.runtime.jobgraph.{SavepointConfigOptions, SavepointRestoreSettings}
import java.io.File
import java.util.{Map => JavaMap}
@@ -76,6 +76,13 @@ case class SubmitRequest(flinkVersion: FlinkVersion,
lazy val allowNonRestoredState = Try(extraParameter.get(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key).toString.toBoolean).getOrElse(false)
+ lazy val savepointRestoreSettings: SavepointRestoreSettings = {
+ savePoint match {
+ case sp if Try(sp.isEmpty).getOrElse(true) => SavepointRestoreSettings.none
+ case sp => SavepointRestoreSettings.forPath(sp, allowNonRestoredState)
+ }
+ }
+
lazy val userJarFile: File = {
executionMode match {
case ExecutionMode.KUBERNETES_NATIVE_APPLICATION => null
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
index a46d3006c..757ac7520 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
@@ -48,7 +48,6 @@ import scala.collection.mutable.ArrayBuffer
import scala.language.postfixOps
import scala.util.{Failure, Success, Try}
-
trait FlinkSubmitTrait extends Logger {
private[submit] lazy val PARAM_KEY_FLINK_CONF = KEY_FLINK_CONF("--")
@@ -102,10 +101,13 @@ trait FlinkSubmitTrait extends Logger {
val flinkDefaultConfiguration = getFlinkDefaultConfiguration(submitRequest.flinkVersion.flinkHome)
//state.checkpoints.num-retained
val retainedOption = CheckpointingOptions.MAX_RETAINED_CHECKPOINTS
- flinkConfig.set(retainedOption, flinkDefaultConfiguration.get(retainedOption))
+ flinkConfig.safeSet(retainedOption, flinkDefaultConfiguration.get(retainedOption))
- //set savepoint.ignore-unclaimed-state parameter
- flinkConfig.setBoolean(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE, submitRequest.allowNonRestoredState)
+ //set savepoint parameter
+ if (submitRequest.savePoint != null) {
+ flinkConfig.safeSet(SavepointConfigOptions.SAVEPOINT_PATH, submitRequest.savePoint)
+ flinkConfig.setBoolean(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE, submitRequest.allowNonRestoredState)
+ }
setConfig(submitRequest, flinkConfig)
@@ -164,6 +166,7 @@ trait FlinkSubmitTrait extends Logger {
.newBuilder
.setJarFile(jarFile)
.setEntryPointClassName(flinkConfig.getOptional(ApplicationConfiguration.APPLICATION_MAIN_CLASS).get())
+ .setSavepointRestoreSettings(submitRequest.savepointRestoreSettings)
.setArguments(
flinkConfig
.getOptional(ApplicationConfiguration.APPLICATION_ARGS)