You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by be...@apache.org on 2022/09/19 10:40:07 UTC

[incubator-streampark] branch dev updated: get jobId bug fixed. (#1650)

This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 092995c8b get jobId bug fixed. (#1650)
092995c8b is described below

commit 092995c8b784d76866fd5648cc35ed3d2b6e9e0a
Author: benjobs <be...@apache.org>
AuthorDate: Mon Sep 19 18:40:00 2022 +0800

    get jobId bug fixed. (#1650)
---
 .../streampark/common/conf/ConfigConst.scala       |  2 -
 .../console/core/entity/Application.java           |  1 +
 .../core/service/impl/ApplicationServiceImpl.java  |  6 +--
 .../console/core/task/FlinkTrackingTask.java       |  2 +
 .../core/task/K8sFlinkChangeEventListener.java     |  1 +
 .../kubernetes/watcher/FlinkJobStatusWatcher.scala | 50 ++++++++++------------
 .../flink/submit/bean/SubmitRequest.scala          |  2 -
 .../impl/KubernetesNativeApplicationSubmit.scala   |  2 +-
 .../flink/submit/trait/FlinkSubmitTrait.scala      |  2 +-
 9 files changed, 31 insertions(+), 37 deletions(-)

diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
index bc49cee95..832c153a9 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
@@ -40,8 +40,6 @@ object ConfigConst {
 
   val KEY_JOB_ID = "jobId"
 
-  val KEY_FLINK_JOB_ID = "flinkJobId"
-
   val KEY_SEMANTIC = "semantic"
 
   /**
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 93e714102..88c78c051 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -96,6 +96,7 @@ public class Application implements Serializable {
     @TableField(updateStrategy = FieldStrategy.IGNORED)
     private String appId;
 
+    @TableField(updateStrategy = FieldStrategy.IGNORED)
     private String jobId;
 
     /**
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index b4ebbf656..3395f89d8 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -102,7 +102,6 @@ import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -1192,7 +1191,6 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
 
         AssertUtils.state(application != null);
 
-        application.setJobId(new JobID().toHexString());
         // if manually started, clear the restart flag
         if (!auto) {
             application.setRestartCount(0);
@@ -1269,7 +1267,6 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
 
         Map<String, Object> extraParameter = new HashMap<>(0);
         extraParameter.put(ConfigConst.KEY_JOB_ID(), application.getId());
-        extraParameter.put(ConfigConst.KEY_FLINK_JOB_ID(), application.getJobId());
 
         if (appParam.getAllowNonRestored()) {
             extraParameter.put(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key(), true);
@@ -1388,6 +1385,9 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
                     }
                 }
                 application.setAppId(submitResponse.clusterId());
+                if (StringUtils.isNoneEmpty(submitResponse.jobId())) {
+                    application.setJobId(submitResponse.jobId());
+                }
 
                 if (StringUtils.isNoneEmpty(submitResponse.jobManagerUrl())) {
                     application.setJobManagerUrl(submitResponse.jobManagerUrl());
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkTrackingTask.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkTrackingTask.java
index 4977c8e1b..64b406834 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkTrackingTask.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkTrackingTask.java
@@ -323,6 +323,8 @@ public class FlinkTrackingTask {
                 application.setEndTime(new Date(endTime));
             }
         }
+
+        application.setJobId(jobOverview.getId());
         application.setDuration(jobOverview.getDuration());
         application.setTotalTask(jobOverview.getTasks().getTotal());
         application.setOverview(jobOverview.getTasks());
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/K8sFlinkChangeEventListener.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/K8sFlinkChangeEventListener.java
index cca06a2b5..858999b53 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/K8sFlinkChangeEventListener.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/K8sFlinkChangeEventListener.java
@@ -162,6 +162,7 @@ public class K8sFlinkChangeEventListener {
         app.setState(fromK8sFlinkJobState(state).getValue());
 
         // update relevant fields of Application from JobStatusCV
+        app.setJobId(jobStatus.jobId());
         app.setTotalTask(jobStatus.taskTotal());
         if (FlinkJobState.isEndState(state)) {
             app.setOptionState(OptionState.NONE.getValue());
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index 339bf1bac..a5d9e7f4d 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -119,7 +119,6 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi
           }
         case _ =>
       })
-
       future
     }
 
@@ -196,10 +195,8 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi
     implicit val pollEmitTime: Long = System.currentTimeMillis
     val clusterId = trackId.clusterId
     val namespace = trackId.namespace
-    logger.debug("Enter the touchApplicationJob logic.")
     val jobDetails = listJobsDetails(ClusterKey(APPLICATION, namespace, clusterId))
     if (jobDetails.isEmpty || jobDetails.get.jobs.isEmpty) {
-      logger.debug("The normal acquisition fails and the speculative logic is used.")
       inferApplicationFlinkJobStateFromK8sEvent(trackId)
     } else {
       Some(jobDetails.get.jobs.head.toJobStatusCV(pollEmitTime, System.currentTimeMillis))
@@ -214,18 +211,16 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi
     Try {
       val clusterRestUrl = trackController.getClusterRestUrl(clusterKey).filter(_.nonEmpty).getOrElse(return None)
       // list flink jobs from rest api
-      val v = callJobsOverviewsApi(clusterRestUrl)
-      logger.debug(s"The first visit was successful.")
-      v
+      callJobsOverviewsApi(clusterRestUrl)
     }.getOrElse {
-      logger.debug("Failed to visit remote flink jobs on kubernetes-native-mode cluster, and the retry access logic is performed.")
+      logger.warn("Failed to visit remote flink jobs on kubernetes-native-mode cluster, and the retry access logic is performed.")
       val clusterRestUrl = trackController.refreshClusterRestUrl(clusterKey).getOrElse(return None)
       Try(callJobsOverviewsApi(clusterRestUrl)) match {
         case Success(s) =>
-          logger.debug("The retry is successful.")
+          logger.info("The retry is successful.")
           s
         case Failure(e) =>
-          logger.debug(s"The retry fetch failed, final status failed, errorStack=${e.getMessage}.")
+          logger.warn(s"The retry fetch failed, final status failed, errorStack=${e.getMessage}.")
           None
       }
     }
@@ -235,14 +230,12 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi
    * list flink jobs details from rest api
    */
   private def callJobsOverviewsApi(restUrl: String): Option[JobDetails] = {
-    logger.debug(s"Try to access flink's service via http:${restUrl}/jobs/overview.")
     val jobDetails = JobDetails.as(
       Request.get(s"$restUrl/jobs/overview")
         .connectTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC))
         .responseTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC))
         .execute.returnContent().asString(StandardCharsets.UTF_8)
     )
-    logger.debug(s"Access flink's service through http success jobDetail:${jobDetails.toString}.")
     jobDetails
   }
 
@@ -254,10 +247,9 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi
   private def inferApplicationFlinkJobStateFromK8sEvent(@Nonnull trackId: TrackId)
                                                        (implicit pollEmitTime: Long): Option[JobStatusCV] = {
 
-    logger.debug("Inaccessible to flink the logic to judge the state.")
     // infer from k8s deployment and event
     val latest: JobStatusCV = trackController.jobStatuses.get(trackId)
-    logger.debug(s"Query the local cache result:${trackController.canceling.has(trackId).toString},trackId ${trackId.toString}.")
+    logger.info(s"Query the local cache result:${trackController.canceling.has(trackId).toString},trackId ${trackId.toString}.")
     val jobState = {
       if (trackController.canceling.has(trackId)) FlinkJobState.CANCELED else {
         // whether deployment exists on kubernetes cluster
@@ -265,24 +257,26 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi
         val deployStateOfTheError = KubernetesDeploymentHelper.getDeploymentStatusChanges(trackId.namespace, trackId.clusterId)
         val isConnection = KubernetesDeploymentHelper.isTheK8sConnectionNormal()
 
-        if (isDeployExists && !deployStateOfTheError) {
-          logger.debug("Task Enter the initialization process.")
-          FlinkJobState.K8S_INITIALIZING
-        } else if (isDeployExists && deployStateOfTheError && isConnection) {
-          KubernetesDeploymentHelper.watchPodTerminatedLog(trackId.namespace, trackId.clusterId)
-          KubernetesDeploymentHelper.deleteTaskDeployment(trackId.namespace, trackId.clusterId)
-          IngressController.deleteIngress(trackId.namespace, trackId.clusterId)
-          logger.debug("Enter the task failure deletion process.")
-          FlinkJobState.FAILED
-        } else if (!isDeployExists && isConnection) {
-          logger.debug("The deployment is deleted and enters the task failure process.")
+        if (isDeployExists) {
+          if (!deployStateOfTheError) {
+            logger.info("Task Enter the initialization process.")
+            FlinkJobState.K8S_INITIALIZING
+          } else if (isConnection) {
+            logger.info("Enter the task failure deletion process.")
+            KubernetesDeploymentHelper.watchPodTerminatedLog(trackId.namespace, trackId.clusterId)
+            KubernetesDeploymentHelper.deleteTaskDeployment(trackId.namespace, trackId.clusterId)
+            IngressController.deleteIngress(trackId.namespace, trackId.clusterId)
+            FlinkJobState.FAILED
+          } else {
+            inferSilentOrLostFromPreCache(latest)
+          }
+        } else if (isConnection) {
+          logger.info("The deployment is deleted and enters the task failure process.")
           FlinkJobState.FAILED
-        }
-        else {
-          logger.debug("Enter the disconnected state process.")
-          // determine if the state should be SILENT or LOST
+        } else {
           inferSilentOrLostFromPreCache(latest)
         }
+
       }
     }
 
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
index c501aacc4..e8461fd43 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
@@ -74,8 +74,6 @@ case class SubmitRequest(flinkVersion: FlinkVersion,
 
   lazy val jobID: String = extraParameter.get(KEY_JOB_ID).toString
 
-  lazy val flinkJobID: String = extraParameter.get(KEY_FLINK_JOB_ID).toString
-
   lazy val savepointRestoreSettings: SavepointRestoreSettings = {
     lazy val allowNonRestoredState = Try(extraParameter.get(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key).toString.toBoolean).getOrElse(false)
     savePoint match {
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeApplicationSubmit.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeApplicationSubmit.scala
index 92b2a3899..c21ff0c88 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeApplicationSubmit.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeApplicationSubmit.scala
@@ -71,7 +71,7 @@ object KubernetesNativeApplicationSubmit extends KubernetesNativeSubmitTrait {
         .getClusterClient
 
       val clusterId = clusterClient.getClusterId
-      val result = SubmitResponse(clusterId, flinkConfig.toMap, submitRequest.flinkJobID)
+      val result = SubmitResponse(clusterId, flinkConfig.toMap)
       logInfo(s"[flink-submit] flink job has been submitted. ${flinkConfIdentifierInfo(flinkConfig)}")
       result
     } catch {
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
index 719f02dfc..57ec45112 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
@@ -98,7 +98,7 @@ trait FlinkSubmitTrait extends Logger {
       .safeSet(CoreOptions.CLASSLOADER_RESOLVE_ORDER, submitRequest.resolveOrder.getName)
       .safeSet(ApplicationConfiguration.APPLICATION_MAIN_CLASS, submitRequest.appMain)
       .safeSet(ApplicationConfiguration.APPLICATION_ARGS, extractProgramArgs(submitRequest))
-      .safeSet(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, submitRequest.flinkJobID)
+      .safeSet(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, new JobID().toHexString)
 
     val flinkDefaultConfiguration = getFlinkDefaultConfiguration(submitRequest.flinkVersion.flinkHome)
     //state.checkpoints.num-retained