You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by be...@apache.org on 2022/10/22 14:05:02 UTC
[incubator-streampark] branch dev updated: [bug]Resolve the problem that the flink jobid stored in the trackid i… (#1877)
This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new e6ff98ccc [bug]Resolve the problem that the flink jobid stored in the trackid i… (#1877)
e6ff98ccc is described below
commit e6ff98cccc93c589a9c3ed1f98e6d5cdb9248a7a
Author: monster <60...@users.noreply.github.com>
AuthorDate: Sat Oct 22 22:04:57 2022 +0800
[bug]Resolve the problem that the flink jobid stored in the trackid i… (#1877)
* [bug]Resolve the problem that the flink jobid stored in the trackid is empty
---
.../main/scala/org/apache/streampark/common/conf/ConfigConst.scala | 2 --
.../scala/org/apache/streampark/common/enums/ApplicationType.java | 1 -
.../scala/org/apache/streampark/common/enums/DevelopmentMode.java | 1 -
.../main/scala/org/apache/streampark/common/enums/ExecutionMode.java | 2 --
.../src/main/scala/org/apache/streampark/common/enums/Semantic.java | 2 --
.../streampark/console/core/service/impl/ApplicationServiceImpl.java | 4 +++-
.../streampark/console/core/task/K8sFlinkTrackMonitorWrapper.java | 2 +-
.../streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala | 2 +-
.../scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala | 4 ++--
.../flink/submit/impl/KubernetesNativeApplicationSubmit.scala | 2 +-
.../apache/streampark/flink/submit/impl/YarnApplicationSubmit.scala | 2 +-
.../org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala | 2 +-
12 files changed, 10 insertions(+), 16 deletions(-)
diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
index 4129f078c..2ece108a8 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
@@ -46,8 +46,6 @@ object ConfigConst {
val KEY_TIMEOUT = "timeout"
- val KEY_JOB_ID = "jobId"
-
val KEY_SEMANTIC = "semantic"
/**
diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/enums/ApplicationType.java b/streampark-common/src/main/scala/org/apache/streampark/common/enums/ApplicationType.java
index a40675d3f..cf2b223f6 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/enums/ApplicationType.java
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/enums/ApplicationType.java
@@ -19,7 +19,6 @@ package org.apache.streampark.common.enums;
import java.io.Serializable;
-
public enum ApplicationType implements Serializable {
/**
* StreamPark Flink
diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/enums/DevelopmentMode.java b/streampark-common/src/main/scala/org/apache/streampark/common/enums/DevelopmentMode.java
index 4718a01ee..1779df59a 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/enums/DevelopmentMode.java
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/enums/DevelopmentMode.java
@@ -19,7 +19,6 @@ package org.apache.streampark.common.enums;
import java.io.Serializable;
-
public enum DevelopmentMode implements Serializable {
/**
diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/enums/ExecutionMode.java b/streampark-common/src/main/scala/org/apache/streampark/common/enums/ExecutionMode.java
index d608d03c6..c2154354c 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/enums/ExecutionMode.java
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/enums/ExecutionMode.java
@@ -22,8 +22,6 @@ import com.google.common.collect.Lists;
import java.io.Serializable;
import java.util.List;
-
-
public enum ExecutionMode implements Serializable {
/**
diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/enums/Semantic.java b/streampark-common/src/main/scala/org/apache/streampark/common/enums/Semantic.java
index 178c50a50..1cf68774d 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/enums/Semantic.java
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/enums/Semantic.java
@@ -19,8 +19,6 @@ package org.apache.streampark.common.enums;
import java.io.Serializable;
-
-
public enum Semantic implements Serializable {
/**
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index acaa79f10..c6d7def33 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -106,6 +106,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
@@ -1293,7 +1294,6 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
Map<String, String> dynamicOption = FlinkSubmitter.extractDynamicOptionAsJava(application.getDynamicOptions());
Map<String, Object> extraParameter = new HashMap<>(0);
- extraParameter.put(ConfigConst.KEY_JOB_ID(), application.getId());
if (appParam.getAllowNonRestored()) {
extraParameter.put(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key(), true);
@@ -1386,6 +1386,8 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
DevelopmentMode.of(application.getJobType()),
ExecutionMode.of(application.getExecutionMode()),
resolveOrder,
+ application.getId(),
+ new JobID().toHexString(),
application.getJobName(),
appConf,
application.getApplicationType(),
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/K8sFlinkTrackMonitorWrapper.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/K8sFlinkTrackMonitorWrapper.java
index 9bba63e9d..c54097ca6 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/K8sFlinkTrackMonitorWrapper.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/K8sFlinkTrackMonitorWrapper.java
@@ -131,7 +131,7 @@ public class K8sFlinkTrackMonitorWrapper {
public static TrackId toTrackId(@Nonnull Application app) {
Enumeration.Value mode = FlinkK8sExecuteMode.of(app.getExecutionModeEnum());
if (FlinkK8sExecuteMode.APPLICATION().equals(mode)) {
- return TrackId.onApplication(app.getK8sNamespace(), app.getClusterId(), app.getId(), null);
+ return TrackId.onApplication(app.getK8sNamespace(), app.getClusterId(), app.getId(), app.getJobId());
} else if (FlinkK8sExecuteMode.SESSION().equals(mode)) {
return TrackId.onSession(app.getK8sNamespace(), app.getClusterId(), app.getId(), app.getJobId());
} else {
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index a5d9e7f4d..f7649f0f9 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -282,7 +282,7 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi
val jobStatusCV = JobStatusCV(
jobState = jobState,
- jobId = null,
+ jobId = trackId.jobId,
pollEmitTime = pollEmitTime,
pollAckTime = System.currentTimeMillis
)
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
index e8461fd43..d3f9e5fc8 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
@@ -47,6 +47,8 @@ case class SubmitRequest(flinkVersion: FlinkVersion,
developmentMode: DevelopmentMode,
executionMode: ExecutionMode,
resolveOrder: ResolveOrder,
+ id: Long,
+ jobId: String,
appName: String,
appConf: String,
applicationType: ApplicationType,
@@ -72,8 +74,6 @@ case class SubmitRequest(flinkVersion: FlinkVersion,
lazy val flinkSQL: String = extraParameter.get(KEY_FLINK_SQL()).toString
- lazy val jobID: String = extraParameter.get(KEY_JOB_ID).toString
-
lazy val savepointRestoreSettings: SavepointRestoreSettings = {
lazy val allowNonRestoredState = Try(extraParameter.get(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key).toString.toBoolean).getOrElse(false)
savePoint match {
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeApplicationSubmit.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeApplicationSubmit.scala
index c21ff0c88..c040e3576 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeApplicationSubmit.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeApplicationSubmit.scala
@@ -71,7 +71,7 @@ object KubernetesNativeApplicationSubmit extends KubernetesNativeSubmitTrait {
.getClusterClient
val clusterId = clusterClient.getClusterId
- val result = SubmitResponse(clusterId, flinkConfig.toMap)
+ val result = SubmitResponse(clusterId, flinkConfig.toMap, submitRequest.jobId)
logInfo(s"[flink-submit] flink job has been submitted. ${flinkConfIdentifierInfo(flinkConfig)}")
result
} catch {
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnApplicationSubmit.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnApplicationSubmit.scala
index 278c271c3..baf7c059b 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnApplicationSubmit.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnApplicationSubmit.scala
@@ -73,7 +73,7 @@ object YarnApplicationSubmit extends YarnSubmitTrait {
case Array(1, 15, _) => array += s"${workspace.APP_SHIMS}/flink-1.15"
case _ => throw new UnsupportedOperationException(s"Unsupported flink version: ${submitRequest.flinkVersion}")
}
- val jobLib = s"${workspace.APP_WORKSPACE}/${submitRequest.jobID}/lib"
+ val jobLib = s"${workspace.APP_WORKSPACE}/${submitRequest.id}/lib"
if (HdfsUtils.exists(jobLib)) {
array += jobLib
}
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
index 93e00d147..3db09b706 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
@@ -98,7 +98,7 @@ trait FlinkSubmitTrait extends Logger {
.safeSet(CoreOptions.CLASSLOADER_RESOLVE_ORDER, submitRequest.resolveOrder.getName)
.safeSet(ApplicationConfiguration.APPLICATION_MAIN_CLASS, submitRequest.appMain)
.safeSet(ApplicationConfiguration.APPLICATION_ARGS, extractProgramArgs(submitRequest))
- .safeSet(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, new JobID().toHexString)
+ .safeSet(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, submitRequest.jobId)
val flinkDefaultConfiguration = getFlinkDefaultConfiguration(submitRequest.flinkVersion.flinkHome)
//state.checkpoints.num-retained