You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by be...@apache.org on 2022/09/19 10:40:07 UTC
[incubator-streampark] branch dev updated: get jobId bug fixed. (#1650)
This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 092995c8b get jobId bug fixed. (#1650)
092995c8b is described below
commit 092995c8b784d76866fd5648cc35ed3d2b6e9e0a
Author: benjobs <be...@apache.org>
AuthorDate: Mon Sep 19 18:40:00 2022 +0800
get jobId bug fixed. (#1650)
---
.../streampark/common/conf/ConfigConst.scala | 2 -
.../console/core/entity/Application.java | 1 +
.../core/service/impl/ApplicationServiceImpl.java | 6 +--
.../console/core/task/FlinkTrackingTask.java | 2 +
.../core/task/K8sFlinkChangeEventListener.java | 1 +
.../kubernetes/watcher/FlinkJobStatusWatcher.scala | 50 ++++++++++------------
.../flink/submit/bean/SubmitRequest.scala | 2 -
.../impl/KubernetesNativeApplicationSubmit.scala | 2 +-
.../flink/submit/trait/FlinkSubmitTrait.scala | 2 +-
9 files changed, 31 insertions(+), 37 deletions(-)
diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
index bc49cee95..832c153a9 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
@@ -40,8 +40,6 @@ object ConfigConst {
val KEY_JOB_ID = "jobId"
- val KEY_FLINK_JOB_ID = "flinkJobId"
-
val KEY_SEMANTIC = "semantic"
/**
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 93e714102..88c78c051 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -96,6 +96,7 @@ public class Application implements Serializable {
@TableField(updateStrategy = FieldStrategy.IGNORED)
private String appId;
+ @TableField(updateStrategy = FieldStrategy.IGNORED)
private String jobId;
/**
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index b4ebbf656..3395f89d8 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -102,7 +102,6 @@ import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.springframework.beans.factory.annotation.Autowired;
@@ -1192,7 +1191,6 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
AssertUtils.state(application != null);
- application.setJobId(new JobID().toHexString());
// if manually started, clear the restart flag
if (!auto) {
application.setRestartCount(0);
@@ -1269,7 +1267,6 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
Map<String, Object> extraParameter = new HashMap<>(0);
extraParameter.put(ConfigConst.KEY_JOB_ID(), application.getId());
- extraParameter.put(ConfigConst.KEY_FLINK_JOB_ID(), application.getJobId());
if (appParam.getAllowNonRestored()) {
extraParameter.put(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key(), true);
@@ -1388,6 +1385,9 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
}
}
application.setAppId(submitResponse.clusterId());
+ if (StringUtils.isNoneEmpty(submitResponse.jobId())) {
+ application.setJobId(submitResponse.jobId());
+ }
if (StringUtils.isNoneEmpty(submitResponse.jobManagerUrl())) {
application.setJobManagerUrl(submitResponse.jobManagerUrl());
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkTrackingTask.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkTrackingTask.java
index 4977c8e1b..64b406834 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkTrackingTask.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkTrackingTask.java
@@ -323,6 +323,8 @@ public class FlinkTrackingTask {
application.setEndTime(new Date(endTime));
}
}
+
+ application.setJobId(jobOverview.getId());
application.setDuration(jobOverview.getDuration());
application.setTotalTask(jobOverview.getTasks().getTotal());
application.setOverview(jobOverview.getTasks());
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/K8sFlinkChangeEventListener.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/K8sFlinkChangeEventListener.java
index cca06a2b5..858999b53 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/K8sFlinkChangeEventListener.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/K8sFlinkChangeEventListener.java
@@ -162,6 +162,7 @@ public class K8sFlinkChangeEventListener {
app.setState(fromK8sFlinkJobState(state).getValue());
// update relevant fields of Application from JobStatusCV
+ app.setJobId(jobStatus.jobId());
app.setTotalTask(jobStatus.taskTotal());
if (FlinkJobState.isEndState(state)) {
app.setOptionState(OptionState.NONE.getValue());
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index 339bf1bac..a5d9e7f4d 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -119,7 +119,6 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi
}
case _ =>
})
-
future
}
@@ -196,10 +195,8 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi
implicit val pollEmitTime: Long = System.currentTimeMillis
val clusterId = trackId.clusterId
val namespace = trackId.namespace
- logger.debug("Enter the touchApplicationJob logic.")
val jobDetails = listJobsDetails(ClusterKey(APPLICATION, namespace, clusterId))
if (jobDetails.isEmpty || jobDetails.get.jobs.isEmpty) {
- logger.debug("The normal acquisition fails and the speculative logic is used.")
inferApplicationFlinkJobStateFromK8sEvent(trackId)
} else {
Some(jobDetails.get.jobs.head.toJobStatusCV(pollEmitTime, System.currentTimeMillis))
@@ -214,18 +211,16 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi
Try {
val clusterRestUrl = trackController.getClusterRestUrl(clusterKey).filter(_.nonEmpty).getOrElse(return None)
// list flink jobs from rest api
- val v = callJobsOverviewsApi(clusterRestUrl)
- logger.debug(s"The first visit was successful.")
- v
+ callJobsOverviewsApi(clusterRestUrl)
}.getOrElse {
- logger.debug("Failed to visit remote flink jobs on kubernetes-native-mode cluster, and the retry access logic is performed.")
+ logger.warn("Failed to visit remote flink jobs on kubernetes-native-mode cluster, and the retry access logic is performed.")
val clusterRestUrl = trackController.refreshClusterRestUrl(clusterKey).getOrElse(return None)
Try(callJobsOverviewsApi(clusterRestUrl)) match {
case Success(s) =>
- logger.debug("The retry is successful.")
+ logger.info("The retry is successful.")
s
case Failure(e) =>
- logger.debug(s"The retry fetch failed, final status failed, errorStack=${e.getMessage}.")
+ logger.warn(s"The retry fetch failed, final status failed, errorStack=${e.getMessage}.")
None
}
}
@@ -235,14 +230,12 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi
* list flink jobs details from rest api
*/
private def callJobsOverviewsApi(restUrl: String): Option[JobDetails] = {
- logger.debug(s"Try to access flink's service via http:${restUrl}/jobs/overview.")
val jobDetails = JobDetails.as(
Request.get(s"$restUrl/jobs/overview")
.connectTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC))
.responseTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC))
.execute.returnContent().asString(StandardCharsets.UTF_8)
)
- logger.debug(s"Access flink's service through http success jobDetail:${jobDetails.toString}.")
jobDetails
}
@@ -254,10 +247,9 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi
private def inferApplicationFlinkJobStateFromK8sEvent(@Nonnull trackId: TrackId)
(implicit pollEmitTime: Long): Option[JobStatusCV] = {
- logger.debug("Inaccessible to flink the logic to judge the state.")
// infer from k8s deployment and event
val latest: JobStatusCV = trackController.jobStatuses.get(trackId)
- logger.debug(s"Query the local cache result:${trackController.canceling.has(trackId).toString},trackId ${trackId.toString}.")
+ logger.info(s"Query the local cache result:${trackController.canceling.has(trackId).toString},trackId ${trackId.toString}.")
val jobState = {
if (trackController.canceling.has(trackId)) FlinkJobState.CANCELED else {
// whether deployment exists on kubernetes cluster
@@ -265,24 +257,26 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi
val deployStateOfTheError = KubernetesDeploymentHelper.getDeploymentStatusChanges(trackId.namespace, trackId.clusterId)
val isConnection = KubernetesDeploymentHelper.isTheK8sConnectionNormal()
- if (isDeployExists && !deployStateOfTheError) {
- logger.debug("Task Enter the initialization process.")
- FlinkJobState.K8S_INITIALIZING
- } else if (isDeployExists && deployStateOfTheError && isConnection) {
- KubernetesDeploymentHelper.watchPodTerminatedLog(trackId.namespace, trackId.clusterId)
- KubernetesDeploymentHelper.deleteTaskDeployment(trackId.namespace, trackId.clusterId)
- IngressController.deleteIngress(trackId.namespace, trackId.clusterId)
- logger.debug("Enter the task failure deletion process.")
- FlinkJobState.FAILED
- } else if (!isDeployExists && isConnection) {
- logger.debug("The deployment is deleted and enters the task failure process.")
+ if (isDeployExists) {
+ if (!deployStateOfTheError) {
+ logger.info("Task Enter the initialization process.")
+ FlinkJobState.K8S_INITIALIZING
+ } else if (isConnection) {
+ logger.info("Enter the task failure deletion process.")
+ KubernetesDeploymentHelper.watchPodTerminatedLog(trackId.namespace, trackId.clusterId)
+ KubernetesDeploymentHelper.deleteTaskDeployment(trackId.namespace, trackId.clusterId)
+ IngressController.deleteIngress(trackId.namespace, trackId.clusterId)
+ FlinkJobState.FAILED
+ } else {
+ inferSilentOrLostFromPreCache(latest)
+ }
+ } else if (isConnection) {
+ logger.info("The deployment is deleted and enters the task failure process.")
FlinkJobState.FAILED
- }
- else {
- logger.debug("Enter the disconnected state process.")
- // determine if the state should be SILENT or LOST
+ } else {
inferSilentOrLostFromPreCache(latest)
}
+
}
}
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
index c501aacc4..e8461fd43 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
@@ -74,8 +74,6 @@ case class SubmitRequest(flinkVersion: FlinkVersion,
lazy val jobID: String = extraParameter.get(KEY_JOB_ID).toString
- lazy val flinkJobID: String = extraParameter.get(KEY_FLINK_JOB_ID).toString
-
lazy val savepointRestoreSettings: SavepointRestoreSettings = {
lazy val allowNonRestoredState = Try(extraParameter.get(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key).toString.toBoolean).getOrElse(false)
savePoint match {
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeApplicationSubmit.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeApplicationSubmit.scala
index 92b2a3899..c21ff0c88 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeApplicationSubmit.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeApplicationSubmit.scala
@@ -71,7 +71,7 @@ object KubernetesNativeApplicationSubmit extends KubernetesNativeSubmitTrait {
.getClusterClient
val clusterId = clusterClient.getClusterId
- val result = SubmitResponse(clusterId, flinkConfig.toMap, submitRequest.flinkJobID)
+ val result = SubmitResponse(clusterId, flinkConfig.toMap)
logInfo(s"[flink-submit] flink job has been submitted. ${flinkConfIdentifierInfo(flinkConfig)}")
result
} catch {
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
index 719f02dfc..57ec45112 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
@@ -98,7 +98,7 @@ trait FlinkSubmitTrait extends Logger {
.safeSet(CoreOptions.CLASSLOADER_RESOLVE_ORDER, submitRequest.resolveOrder.getName)
.safeSet(ApplicationConfiguration.APPLICATION_MAIN_CLASS, submitRequest.appMain)
.safeSet(ApplicationConfiguration.APPLICATION_ARGS, extractProgramArgs(submitRequest))
- .safeSet(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, submitRequest.flinkJobID)
+ .safeSet(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, new JobID().toHexString)
val flinkDefaultConfiguration = getFlinkDefaultConfiguration(submitRequest.flinkVersion.flinkHome)
//state.checkpoints.num-retained