You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by lv...@apache.org on 2022/11/04 08:59:43 UTC

[incubator-streampark] branch dev updated: [Bug] flinkjob restore from savepoint bug fixed (#1962)

This is an automated email from the ASF dual-hosted git repository.

lvshaokang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 9223bcfd9 [Bug] flinkjob restore from savepoint bug fixed (#1962)
9223bcfd9 is described below

commit 9223bcfd9170f6a91ca088b99c00daefd5626458
Author: benjobs <be...@apache.org>
AuthorDate: Fri Nov 4 16:59:37 2022 +0800

    [Bug] flinkjob restore from savepoint bug fixed (#1962)
    
    Fixed flinkjob restore from savepoint bug.
---
 .../apache/streampark/flink/submit/bean/SubmitRequest.scala   |  9 ++++++++-
 .../streampark/flink/submit/trait/FlinkSubmitTrait.scala      | 11 +++++++----
 2 files changed, 15 insertions(+), 5 deletions(-)

diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
index ca79b68dc..d25311146 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
@@ -25,7 +25,7 @@ import org.apache.streampark.common.enums._
 import org.apache.streampark.common.util.{DeflaterUtils, FlinkUtils, HdfsUtils, PropertiesUtils}
 import org.apache.streampark.flink.packer.pipeline.{BuildResult, ShadedBuildResponse}
 import org.apache.commons.io.FileUtils
-import org.apache.flink.runtime.jobgraph.SavepointConfigOptions
+import org.apache.flink.runtime.jobgraph.{SavepointConfigOptions, SavepointRestoreSettings}
 
 import java.io.File
 import java.util.{Map => JavaMap}
@@ -76,6 +76,13 @@ case class SubmitRequest(flinkVersion: FlinkVersion,
 
   lazy val allowNonRestoredState = Try(extraParameter.get(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key).toString.toBoolean).getOrElse(false)
 
+  lazy val savepointRestoreSettings: SavepointRestoreSettings = {
+    savePoint match {
+      case sp if Try(sp.isEmpty).getOrElse(true) => SavepointRestoreSettings.none
+      case sp => SavepointRestoreSettings.forPath(sp, allowNonRestoredState)
+    }
+  }
+
   lazy val userJarFile: File = {
     executionMode match {
       case ExecutionMode.KUBERNETES_NATIVE_APPLICATION => null
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
index a46d3006c..757ac7520 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
@@ -48,7 +48,6 @@ import scala.collection.mutable.ArrayBuffer
 import scala.language.postfixOps
 import scala.util.{Failure, Success, Try}
 
-
 trait FlinkSubmitTrait extends Logger {
 
   private[submit] lazy val PARAM_KEY_FLINK_CONF = KEY_FLINK_CONF("--")
@@ -102,10 +101,13 @@ trait FlinkSubmitTrait extends Logger {
     val flinkDefaultConfiguration = getFlinkDefaultConfiguration(submitRequest.flinkVersion.flinkHome)
     //state.checkpoints.num-retained
     val retainedOption = CheckpointingOptions.MAX_RETAINED_CHECKPOINTS
-    flinkConfig.set(retainedOption, flinkDefaultConfiguration.get(retainedOption))
+    flinkConfig.safeSet(retainedOption, flinkDefaultConfiguration.get(retainedOption))
 
-    //set savepoint.ignore-unclaimed-state parameter
-    flinkConfig.setBoolean(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE, submitRequest.allowNonRestoredState)
+    //set savepoint parameter
+    if (submitRequest.savePoint != null) {
+      flinkConfig.safeSet(SavepointConfigOptions.SAVEPOINT_PATH, submitRequest.savePoint)
+      flinkConfig.setBoolean(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE, submitRequest.allowNonRestoredState)
+    }
 
     setConfig(submitRequest, flinkConfig)
 
@@ -164,6 +166,7 @@ trait FlinkSubmitTrait extends Logger {
       .newBuilder
       .setJarFile(jarFile)
       .setEntryPointClassName(flinkConfig.getOptional(ApplicationConfiguration.APPLICATION_MAIN_CLASS).get())
+      .setSavepointRestoreSettings(submitRequest.savepointRestoreSettings)
       .setArguments(
         flinkConfig
           .getOptional(ApplicationConfiguration.APPLICATION_ARGS)