You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by be...@apache.org on 2022/10/22 14:05:02 UTC

[incubator-streampark] branch dev updated: [bug]Resolve the problem that the flink jobid stored in the trackid i… (#1877)

This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new e6ff98ccc [bug]Resolve the problem that the flink jobid stored in the trackid i… (#1877)
e6ff98ccc is described below

commit e6ff98cccc93c589a9c3ed1f98e6d5cdb9248a7a
Author: monster <60...@users.noreply.github.com>
AuthorDate: Sat Oct 22 22:04:57 2022 +0800

    [bug]Resolve the problem that the flink jobid stored in the trackid i… (#1877)
    
    * [bug]Resolve the problem that the flink jobid stored in the trackid is empty
---
 .../main/scala/org/apache/streampark/common/conf/ConfigConst.scala    | 2 --
 .../scala/org/apache/streampark/common/enums/ApplicationType.java     | 1 -
 .../scala/org/apache/streampark/common/enums/DevelopmentMode.java     | 1 -
 .../main/scala/org/apache/streampark/common/enums/ExecutionMode.java  | 2 --
 .../src/main/scala/org/apache/streampark/common/enums/Semantic.java   | 2 --
 .../streampark/console/core/service/impl/ApplicationServiceImpl.java  | 4 +++-
 .../streampark/console/core/task/K8sFlinkTrackMonitorWrapper.java     | 2 +-
 .../streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala   | 2 +-
 .../scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala | 4 ++--
 .../flink/submit/impl/KubernetesNativeApplicationSubmit.scala         | 2 +-
 .../apache/streampark/flink/submit/impl/YarnApplicationSubmit.scala   | 2 +-
 .../org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala   | 2 +-
 12 files changed, 10 insertions(+), 16 deletions(-)

diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
index 4129f078c..2ece108a8 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
@@ -46,8 +46,6 @@ object ConfigConst {
 
   val KEY_TIMEOUT = "timeout"
 
-  val KEY_JOB_ID = "jobId"
-
   val KEY_SEMANTIC = "semantic"
 
   /**
diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/enums/ApplicationType.java b/streampark-common/src/main/scala/org/apache/streampark/common/enums/ApplicationType.java
index a40675d3f..cf2b223f6 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/enums/ApplicationType.java
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/enums/ApplicationType.java
@@ -19,7 +19,6 @@ package org.apache.streampark.common.enums;
 
 import java.io.Serializable;
 
-
 public enum ApplicationType implements Serializable {
     /**
      * StreamPark Flink
diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/enums/DevelopmentMode.java b/streampark-common/src/main/scala/org/apache/streampark/common/enums/DevelopmentMode.java
index 4718a01ee..1779df59a 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/enums/DevelopmentMode.java
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/enums/DevelopmentMode.java
@@ -19,7 +19,6 @@ package org.apache.streampark.common.enums;
 
 import java.io.Serializable;
 
-
 public enum DevelopmentMode implements Serializable {
 
     /**
diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/enums/ExecutionMode.java b/streampark-common/src/main/scala/org/apache/streampark/common/enums/ExecutionMode.java
index d608d03c6..c2154354c 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/enums/ExecutionMode.java
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/enums/ExecutionMode.java
@@ -22,8 +22,6 @@ import com.google.common.collect.Lists;
 import java.io.Serializable;
 import java.util.List;
 
-
-
 public enum ExecutionMode implements Serializable {
 
     /**
diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/enums/Semantic.java b/streampark-common/src/main/scala/org/apache/streampark/common/enums/Semantic.java
index 178c50a50..1cf68774d 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/enums/Semantic.java
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/enums/Semantic.java
@@ -19,8 +19,6 @@ package org.apache.streampark.common.enums;
 
 import java.io.Serializable;
 
-
-
 public enum Semantic implements Serializable {
 
     /**
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index acaa79f10..c6d7def33 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -106,6 +106,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.FilenameUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
@@ -1293,7 +1294,6 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
         Map<String, String> dynamicOption = FlinkSubmitter.extractDynamicOptionAsJava(application.getDynamicOptions());
 
         Map<String, Object> extraParameter = new HashMap<>(0);
-        extraParameter.put(ConfigConst.KEY_JOB_ID(), application.getId());
 
         if (appParam.getAllowNonRestored()) {
             extraParameter.put(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key(), true);
@@ -1386,6 +1386,8 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
             DevelopmentMode.of(application.getJobType()),
             ExecutionMode.of(application.getExecutionMode()),
             resolveOrder,
+            application.getId(),
+            new JobID().toHexString(),
             application.getJobName(),
             appConf,
             application.getApplicationType(),
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/K8sFlinkTrackMonitorWrapper.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/K8sFlinkTrackMonitorWrapper.java
index 9bba63e9d..c54097ca6 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/K8sFlinkTrackMonitorWrapper.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/K8sFlinkTrackMonitorWrapper.java
@@ -131,7 +131,7 @@ public class K8sFlinkTrackMonitorWrapper {
         public static TrackId toTrackId(@Nonnull Application app) {
             Enumeration.Value mode = FlinkK8sExecuteMode.of(app.getExecutionModeEnum());
             if (FlinkK8sExecuteMode.APPLICATION().equals(mode)) {
-                return TrackId.onApplication(app.getK8sNamespace(), app.getClusterId(), app.getId(), null);
+                return TrackId.onApplication(app.getK8sNamespace(), app.getClusterId(), app.getId(), app.getJobId());
             } else if (FlinkK8sExecuteMode.SESSION().equals(mode)) {
                 return TrackId.onSession(app.getK8sNamespace(), app.getClusterId(), app.getId(), app.getJobId());
             } else {
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index a5d9e7f4d..f7649f0f9 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -282,7 +282,7 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi
 
     val jobStatusCV = JobStatusCV(
       jobState = jobState,
-      jobId = null,
+      jobId = trackId.jobId,
       pollEmitTime = pollEmitTime,
       pollAckTime = System.currentTimeMillis
     )
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
index e8461fd43..d3f9e5fc8 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
@@ -47,6 +47,8 @@ case class SubmitRequest(flinkVersion: FlinkVersion,
                          developmentMode: DevelopmentMode,
                          executionMode: ExecutionMode,
                          resolveOrder: ResolveOrder,
+                         id: Long,
+                         jobId: String,
                          appName: String,
                          appConf: String,
                          applicationType: ApplicationType,
@@ -72,8 +74,6 @@ case class SubmitRequest(flinkVersion: FlinkVersion,
 
   lazy val flinkSQL: String = extraParameter.get(KEY_FLINK_SQL()).toString
 
-  lazy val jobID: String = extraParameter.get(KEY_JOB_ID).toString
-
   lazy val savepointRestoreSettings: SavepointRestoreSettings = {
     lazy val allowNonRestoredState = Try(extraParameter.get(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key).toString.toBoolean).getOrElse(false)
     savePoint match {
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeApplicationSubmit.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeApplicationSubmit.scala
index c21ff0c88..c040e3576 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeApplicationSubmit.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeApplicationSubmit.scala
@@ -71,7 +71,7 @@ object KubernetesNativeApplicationSubmit extends KubernetesNativeSubmitTrait {
         .getClusterClient
 
       val clusterId = clusterClient.getClusterId
-      val result = SubmitResponse(clusterId, flinkConfig.toMap)
+      val result = SubmitResponse(clusterId, flinkConfig.toMap, submitRequest.jobId)
       logInfo(s"[flink-submit] flink job has been submitted. ${flinkConfIdentifierInfo(flinkConfig)}")
       result
     } catch {
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnApplicationSubmit.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnApplicationSubmit.scala
index 278c271c3..baf7c059b 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnApplicationSubmit.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnApplicationSubmit.scala
@@ -73,7 +73,7 @@ object YarnApplicationSubmit extends YarnSubmitTrait {
             case Array(1, 15, _) => array += s"${workspace.APP_SHIMS}/flink-1.15"
             case _ => throw new UnsupportedOperationException(s"Unsupported flink version: ${submitRequest.flinkVersion}")
           }
-          val jobLib = s"${workspace.APP_WORKSPACE}/${submitRequest.jobID}/lib"
+          val jobLib = s"${workspace.APP_WORKSPACE}/${submitRequest.id}/lib"
           if (HdfsUtils.exists(jobLib)) {
             array += jobLib
           }
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
index 93e00d147..3db09b706 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
@@ -98,7 +98,7 @@ trait FlinkSubmitTrait extends Logger {
       .safeSet(CoreOptions.CLASSLOADER_RESOLVE_ORDER, submitRequest.resolveOrder.getName)
       .safeSet(ApplicationConfiguration.APPLICATION_MAIN_CLASS, submitRequest.appMain)
       .safeSet(ApplicationConfiguration.APPLICATION_ARGS, extractProgramArgs(submitRequest))
-      .safeSet(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, new JobID().toHexString)
+      .safeSet(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, submitRequest.jobId)
 
     val flinkDefaultConfiguration = getFlinkDefaultConfiguration(submitRequest.flinkVersion.flinkHome)
     //state.checkpoints.num-retained