You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by be...@apache.org on 2022/09/15 02:09:14 UTC

[incubator-streampark] branch dev updated: [Improve] code format and translate comments to EN. (#1607)

This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new bf52c5f89 [Improve] code format and translate comments to EN. (#1607)
bf52c5f89 is described below

commit bf52c5f89c279eae6014d1f60ca32aa57fafa78c
Author: lvshaokang <lv...@hotmail.com>
AuthorDate: Thu Sep 15 10:09:08 2022 +0800

    [Improve] code format and translate comments to EN. (#1607)
    
    * test
    
    * code format
---
 deploy/docker/spark/Dockerfile                     |   1 -
 .../core/service/impl/ApplicationServiceImpl.java  |   6 +-
 .../console/core/task/FlinkTrackingTask.java       | 184 +++++++++------------
 3 files changed, 85 insertions(+), 106 deletions(-)

diff --git a/deploy/docker/spark/Dockerfile b/deploy/docker/spark/Dockerfile
index b4755721c..4d05fa0b4 100644
--- a/deploy/docker/spark/Dockerfile
+++ b/deploy/docker/spark/Dockerfile
@@ -16,7 +16,6 @@
 #
 
 FROM docker.io/bitnami/spark:3.2.0
-LABEL maintainer="ning.guo <ni...@gmail.com>"
 LABEL description="Docker image with Spark (3.2.0) and Hadoop (3.3.2) and Hive (3.1.3), based on bitnami/spark:3.2.0. \
 For more information, please visit https://github.com/s1mplecc/spark-hadoop-docker."
 
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 1c0d60251..6fb414953 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -433,8 +433,8 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
         if (!FlinkAppState.CANCELED.equals(state)) {
             return false;
         }
-        Long useId = FlinkTrackingTask.getCanlledJobUserId(appId);
-        if (useId == null || application.getUserId().longValue() != FlinkTrackingTask.getCanlledJobUserId(appId).longValue()) {
+        Long useId = FlinkTrackingTask.getCanceledJobUserId(appId);
+        if (useId == null || application.getUserId().longValue() != FlinkTrackingTask.getCanceledJobUserId(appId).longValue()) {
             return true;
         }
         return false;
@@ -1057,7 +1057,7 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
 
         Long userId = commonService.getCurrentUser().getUserId();
         if (!application.getUserId().equals(userId)) {
-            FlinkTrackingTask.addCanlledApp(application.getId(), userId);
+            FlinkTrackingTask.addCanceledApp(application.getId(), userId);
         }
 
         CancelRequest cancelRequest = new CancelRequest(
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkTrackingTask.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkTrackingTask.java
index e709f6713..796e034b7 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkTrackingTask.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkTrackingTask.java
@@ -77,7 +77,7 @@ public class FlinkTrackingTask {
 
     // track interval  every 5 seconds
     private static final long TRACK_INTERVAL = 1000L * 5;
-    //option interval within 10 seconds
+    // option interval within 10 seconds
     private static final long OPTION_INTERVAL = 1000L * 10;
 
     /**
@@ -102,22 +102,23 @@ public class FlinkTrackingTask {
 
     /**
      * <pre>
-     * StopFrom: 用来记录任务是从StreamPark web管理端停止的还是其他方式停止
-     * 如从StreamPark web管理端停止可以知道在停止任务时是否做savepoint,如做了savepoint,则将该savepoint设置为最后有效的savepoint,下次启动时,自动选择从该savepoint
-     * 如:其他方式停止则,无法知道是否savepoint,直接将所有的savepoint设置为过期,任务再次启动时需要手动指定
+     * StopFrom: marked a task stopped from the stream-park web or other ways.
+     * If stop from stream-park web, you can know whether to make a savepoint when you stop the task, and if you make a savepoint,
+     * you can set the savepoint as the last effect savepoint, and the next time start, will be automatically choose to start.
+     * In other words, if stop from other ways, there is no way to know the savepoint has been done, directly set all the savepoint
+     * to expire, and needs to be manually specified when started again.
      * </pre>
      */
     private static final Map<Long, StopFrom> STOP_FROM_MAP = new ConcurrentHashMap<>(0);
 
     /**
-     * 检查到正在canceling的任务放到该cache中,过期时间为10秒(2次任务监控轮询的时间).
+     * Cancelling tasks are placed in this cache with an expiration time of 10 seconds (the time of 2 task monitoring polls).
      */
     private static final Cache<Long, Byte> CANCELING_CACHE = Caffeine.newBuilder().expireAfterWrite(10, TimeUnit.SECONDS).build();
 
     /**
-     * 任务取消跟踪列表,跟踪任务是由谁取消
-     * key:applicationId
-     * value:userId
+     * Task canceled tracking list, record who cancelled the tracking task
+     * Map<applicationId,userId>
      */
     private static final Map<Long, Long> CANCELLED_JOB_MAP = new ConcurrentHashMap<>(0);
 
@@ -136,9 +137,6 @@ public class FlinkTrackingTask {
     @Autowired
     private CheckpointProcessor checkpointProcessor;
 
-    /**
-     * 常用版本更新
-     */
     private static final Map<Long, FlinkEnv> FLINK_ENV_MAP = new ConcurrentHashMap<>(0);
 
     private static final Map<Long, FlinkCluster> FLINK_CLUSTER_MAP = new ConcurrentHashMap<>(0);
@@ -178,21 +176,22 @@ public class FlinkTrackingTask {
     }
 
     /**
-     * <p> <strong> NOTE: 执行必须满足以下条件</strong>
-     * <p> <strong>1) 工程刚启动或者管理端页面正常操作任务(启动|停止),该操作需要非常实时的返回状态,频率1秒一次,持续10秒种(10次)</strong></p>
-     * <p> <strong>2) 正常的状态信息获取,5秒执行一次</strong></p>
+     * <p> <strong>NOTE: The following conditions must be met for execution</strong>
+     * <p> <strong>1) Program started or page operated task, such as start/stop, needs to return the state immediately.
+     * (the frequency of 1 second once, continued 10 seconds (10 times))</strong></p>
+     * <p> <strong>2) Normal information obtain, once every 5 seconds</strong></p>
      */
     @Scheduled(fixedDelay = 1000)
     public void execute() {
 
-        //1) 项目刚启动第一次执行,或者前端正在操作...(启动,停止)需要立即返回状态信息.
+        // The application has been started at the first time, or the front-end is operating start/stop, need to return status info immediately.
         if (lastTrackTime == null || !OPTIONING.isEmpty()) {
             tracking();
         } else if (System.currentTimeMillis() - lastOptionTime <= OPTION_INTERVAL) {
-            //2) 如果在管理端正在操作时间的10秒中之内(每秒执行一次)
+            // The last operation time is less than option interval.(10 seconds)
             tracking();
         } else if (System.currentTimeMillis() - lastTrackTime >= TRACK_INTERVAL) {
-            //3) 正常信息获取,判断本次时间和上次时间是否间隔5秒(正常监控信息获取,每5秒一次)
+            // Normal information obtain, check if there is 5 seconds interval between this time and the last time.(once every 5 seconds)
             tracking();
         }
     }
@@ -207,21 +206,20 @@ public class FlinkTrackingTask {
                     final StopFrom stopFrom = STOP_FROM_MAP.getOrDefault(key, null) == null ? StopFrom.NONE : STOP_FROM_MAP.get(key);
                     final OptionState optionState = OPTIONING.get(key);
                     try {
-                        // 1) 到flink的REST Api中查询状态
+                        // query status from flink rest api
                         assert application.getId() != null;
                         getFromFlinkRestApi(application, stopFrom);
                     } catch (Exception flinkException) {
-                        // 2) 到 YARN REST api中查询状态
+                        // query status from yarn rest api
                         try {
                             getFromYarnRestApi(application, stopFrom);
                         } catch (Exception yarnException) {
                             /*
-                              3) 从flink的restAPI和yarn的restAPI都查询失败</br>
-                              此时需要根据管理端正在操作的状态来决定是否返回最终状态,需满足:</br>
-                              1: 操作状态为为取消和正常的状态跟踪(操作状态不为STARTING)</br>
+                              Query from flink's restAPI and yarn's restAPI both failed.
+                              In this case, it is necessary to decide whether to return to the final state depending on the state being operated
                              */
                             if (optionState == null || !optionState.equals(OptionState.STARTING)) {
-                                //非正在手动映射appId
+                                // non-mapping
                                 if (application.getState() != FlinkAppState.MAPPING.getValue()) {
                                     log.error("flinkTrackingTask getFromFlinkRestApi and getFromYarnRestApi error,job failed,savePoint obsoleted!");
                                     if (StopFrom.NONE.equals(stopFrom)) {
@@ -233,8 +231,9 @@ public class FlinkTrackingTask {
                                     }
                                 }
                                 /*
-                                  进入到这一步说明前两种方式获取信息都失败,此步是最后一步,直接会判别任务取消或失联</br>
-                                  需清空savepoint.
+                                  This step means that the above two ways to get information have failed, and this step is the last step,
+                                  which will directly identify the mission as cancelled or lost.
+                                  Need clean savepoint.
                                  */
                                 cleanSavepoint(application);
                                 cleanOptioning(optionState, key);
@@ -248,7 +247,7 @@ public class FlinkTrackingTask {
                                         try {
                                             applicationService.start(application, true);
                                         } catch (Exception e) {
-                                            logError(e.getMessage(), e);
+                                            log.error(e.getMessage(), e);
                                         }
                                     }
                                 }
@@ -261,18 +260,18 @@ public class FlinkTrackingTask {
     }
 
     /**
-     * 从flink restapi成功拿到当前任务的运行状态信息...
+     * Get the current task running status information from flink restapi
      *
-     * @param application
-     * @param stopFrom
-     * @throws Exception
+     * @param application application
+     * @param stopFrom stopFrom
      */
     private void getFromFlinkRestApi(Application application, StopFrom stopFrom) throws Exception {
         FlinkCluster flinkCluster = getFlinkCluster(application);
         JobsOverview jobsOverview = httpJobsOverview(application, flinkCluster);
         Optional<JobsOverview.Job> optional;
         if (ExecutionMode.isYarnMode(application.getExecutionMode())) {
-            optional = jobsOverview.getJobs().size() > 1 ? jobsOverview.getJobs().stream().filter(a -> StringUtils.equals(application.getJobId(), a.getId())).findFirst() : jobsOverview.getJobs().stream().findFirst();
+            optional = jobsOverview.getJobs().size() > 1 ? jobsOverview.getJobs().stream().filter(a ->
+                StringUtils.equals(application.getJobId(), a.getId())).findFirst() : jobsOverview.getJobs().stream().findFirst();
         } else {
             optional = jobsOverview.getJobs().stream().filter(x -> x.getId().equals(application.getJobId())).findFirst();
         }
@@ -286,17 +285,16 @@ public class FlinkTrackingTask {
                     // 1) set info from JobOverview
                     handleJobOverview(application, jobOverview);
                 } catch (Exception e) {
-                    logError("get flink jobOverview error: {}", e);
+                    log.error("get flink jobOverview error: {}", e.getMessage(), e);
                 }
                 try {
                     //2) CheckPoints
                     handleCheckPoints(application);
                 } catch (Exception e) {
-                    logError("get flink checkPoints error: {}", e);
+                    log.error("get flink jobOverview error: {}", e.getMessage(), e);
                 }
                 //3) savePoint obsolete check and NEED_START check
                 OptionState optionState = OPTIONING.get(application.getId());
-                // cpu分支预测,将Running的状态单独拿出来
                 if (currentState.equals(FlinkAppState.RUNNING)) {
                     handleRunningState(application, optionState, currentState);
                 } else {
@@ -306,19 +304,14 @@ public class FlinkTrackingTask {
         }
     }
 
-    private void logError(String s, Exception e) {
-        log.error(s, e);
-    }
-
     /**
-     * 基本信息回写等处理
+     * handle job overview
      *
-     * @param application
-     * @param jobOverview
-     * @throws IOException
+     * @param application application
+     * @param jobOverview jobOverview
      */
     private void handleJobOverview(Application application, JobsOverview.Job jobOverview) throws IOException {
-        // 1) duration
+        // compute duration
         long startTime = jobOverview.getStartTime();
         long endTime = jobOverview.getEndTime();
         if (application.getStartTime() == null || startTime != application.getStartTime().getTime()) {
@@ -331,7 +324,7 @@ public class FlinkTrackingTask {
         }
         application.setDuration(jobOverview.getDuration());
 
-        // 2) overview,刚启动第一次获取Overview信息.
+        // get overview info at the first start time
         if (STARTING_CACHE.getIfPresent(application.getId()) != null) {
             application.setTotalTask(jobOverview.getTasks().getTotal());
             application.setOverview(jobOverview.getTasks());
@@ -348,10 +341,7 @@ public class FlinkTrackingTask {
     }
 
     /**
-     * 获取最新的checkPoint
-     *
-     * @param application
-     * @throws IOException
+     * get latest checkpoint
      */
     private void handleCheckPoints(Application application) throws Exception {
         FlinkCluster flinkCluster = getFlinkCluster(application);
@@ -362,35 +352,34 @@ public class FlinkTrackingTask {
     }
 
     /**
-     * 当前任务正在运行,一系列状态处理.
+     * Handle running task
      *
-     * @param application
-     * @param optionState
-     * @param currentState
+     * @param application application
+     * @param optionState optionState
+     * @param currentState currentState
      */
     private void handleRunningState(Application application, OptionState optionState, FlinkAppState currentState) {
         /*
-          上次记录的状态的 "STARTING" 本次获取到最新的状态为"RUNNING",说明是重启后的第一次跟踪
-          则:job以下状态需要更新为重启状态:
-          NEED_RESTART_AFTER_CONF_UPDATE(配置文件修改后需要重新启动)
-          NEED_RESTART_AFTER_SQL_UPDATE(flink sql修改后需要重启)
-          NEED_RESTART_AFTER_ROLLBACK(任务回滚后需要重启)
-          NEED_RESTART_AFTER_DEPLOY(任务重新发布后需要回滚)
+          if the last recorded state is STARTING and the latest state obtained this time is RUNNING,
+          which means it is the first tracking after restart.
+          Then: the following the job status needs to be updated to the restart status:
+          NEED_RESTART_AFTER_CONF_UPDATE (Need to restart  after modified configuration)
+          NEED_RESTART_AFTER_SQL_UPDATE (Need to restart  after modified flink sql)
+          NEED_RESTART_AFTER_ROLLBACK (Need to restart after rollback)
+          NEED_RESTART_AFTER_DEPLOY (Need to rollback after deploy)
          */
         if (OptionState.STARTING.equals(optionState)) {
-            LaunchState launchState = LaunchState.of(application.getLaunch());
-            //如果任务更新后需要重新启动 或 发布后需要重新启动
-            switch (launchState) {
+            switch (LaunchState.of(application.getLaunch())) {
                 case NEED_RESTART:
                 case NEED_ROLLBACK:
-                    //清空需要重新启动的状态.
                     application.setLaunch(LaunchState.DONE.get());
                     break;
                 default:
                     break;
             }
         }
-        // 当前状态为running,且savePointCache里有当前任务,说明该任务正在做savepoint
+        // The current state is running, and there is a current task in the savePointCache,
+        // indicating that the task is doing savepoint
         if (SAVEPOINT_CACHE.getIfPresent(application.getId()) != null) {
             application.setOptionState(OptionState.SAVEPOINTING.getValue());
         } else {
@@ -402,12 +391,12 @@ public class FlinkTrackingTask {
     }
 
     /**
-     * 当前任务未运行,状态处理
+     * Handle not running task
      *
-     * @param application
-     * @param optionState
-     * @param currentState
-     * @param stopFrom
+     * @param application application
+     * @param optionState optionState
+     * @param currentState currentState
+     * @param stopFrom stopFrom
      */
     private void handleNotRunState(Application application,
                                    OptionState optionState,
@@ -427,21 +416,17 @@ public class FlinkTrackingTask {
                 if (StopFrom.NONE.equals(stopFrom) || applicationService.checkAlter(application)) {
                     log.info("flinkTrackingTask getFromFlinkRestApi, job cancel is not form StreamPark,savePoint obsoleted!");
                     savePointService.obsolete(application.getId());
-                    stopCanlledJob(application.getId());
+                    stopCanceledJob(application.getId());
                     alertService.alert(application, FlinkAppState.CANCELED);
                 }
-                //清理stopFrom
                 STOP_FROM_MAP.remove(application.getId());
-                //持久化application并且移除跟踪监控
                 persistentAndClean(application);
                 cleanOptioning(optionState, application.getId());
                 break;
             case FAILED:
                 cleanSavepoint(application);
-                //清理stopFrom
                 STOP_FROM_MAP.remove(application.getId());
                 application.setState(FlinkAppState.FAILED.getValue());
-                //持久化application并且移除跟踪监控
                 persistentAndClean(application);
                 alertService.alert(application, FlinkAppState.FAILED);
                 applicationService.start(application, true);
@@ -457,18 +442,19 @@ public class FlinkTrackingTask {
     }
 
     /**
-     * <p><strong>到 yarn中查询job的历史记录,说明flink任务已经停止,任务的最终状态为"CANCELED"</strong>
+     * <p><strong>Query the job history in yarn, indicating that the task has stopped, and the final status of the task is CANCELED</strong>
      *
-     * @param application
-     * @param stopFrom
+     * @param application application
+     * @param stopFrom stopFrom
      */
     private void getFromYarnRestApi(Application application, StopFrom stopFrom) throws Exception {
         log.debug("flinkTrackingTask getFromYarnRestApi starting...");
         OptionState optionState = OPTIONING.get(application.getId());
 
         /*
-          上一次的状态为canceling(在获取信息时flink restServer还未关闭为canceling)
-          且本次如获取不到状态(flink restServer已关闭),则认为任务已经CANCELED
+          If the status of the last time is CANCELING (flink rest server is not closed at the time of getting information)
+          and the status is not obtained this time (flink rest server is closed),
+          the task is considered CANCELED
          */
         Byte flag = CANCELING_CACHE.getIfPresent(application.getId());
         if (flag != null) {
@@ -482,7 +468,7 @@ public class FlinkTrackingTask {
             cleanOptioning(optionState, application.getId());
             this.persistentAndClean(application);
         } else {
-            // 2)到yarn的restApi中查询状态
+            // query the status from the yarn rest Api
             AppInfo appInfo = httpYarnAppInfo(application);
             if (appInfo == null) {
                 if (!ExecutionMode.REMOTE.equals(application.getExecutionModeEnum())) {
@@ -508,15 +494,15 @@ public class FlinkTrackingTask {
                         flinkAppState = FlinkAppState.FINISHED;
                     }
                     application.setState(flinkAppState.getValue());
-                    //能运行到这一步,说明到YARN REST api中成功查询到信息
                     cleanOptioning(optionState, application.getId());
                     this.persistentAndClean(application);
 
-                    if (flinkAppState.equals(FlinkAppState.FAILED) || flinkAppState.equals(FlinkAppState.LOST)
+                    if (flinkAppState.equals(FlinkAppState.FAILED)
+                        || flinkAppState.equals(FlinkAppState.LOST)
                         || (flinkAppState.equals(FlinkAppState.CANCELED) && StopFrom.NONE.equals(stopFrom))
                         || applicationService.checkAlter(application)) {
                         alertService.alert(application, flinkAppState);
-                        stopCanlledJob(application.getId());
+                        stopCanceledJob(application.getId());
                         if (flinkAppState.equals(FlinkAppState.FAILED)) {
                             applicationService.start(application, true);
                         }
@@ -561,9 +547,11 @@ public class FlinkTrackingTask {
 
 
     /**
-     * <p><strong>1分钟往数据库同步一次状态</strong></p></br>
-     * <p><strong>NOTE:该操作可能会导致当程序挂了,所监控的状态没及时往数据库同步的情况,造成被监控的实际的application和数控库状态不一致的情况
-     * 但是这种操作也仅在每次程序挂和升级手动停止的情况,但是带的是减少了对数据库读写的次数,减小了数据的压力.
+     * <p><strong>Synchronize status to database once a minute</strong></p></br>
+     * <p><strong>NOTE: This operation may lead to the situation that when the program crash, the monitored state is not synchronized to
+     * the database in time, resulting in inconsistency between the actual application being monitored and the database state. But these
+     * problems will only occur when the program crash and manually stop the program. At the same time, the benefit is reduce the I/O from
+     * database reading and writing.
      * </strong></p>
      */
     @Scheduled(fixedDelay = 1000 * 60)
@@ -571,10 +559,8 @@ public class FlinkTrackingTask {
         TRACKING_MAP.forEach((k, v) -> persistent(v));
     }
 
-    // ===============================  static public method...  =========================================
-
     /**
-     * 设置正在操作中...
+     * set current option state
      */
     public static void setOptionState(Long appId, OptionState state) {
         if (isKubernetesApp(appId)) {
@@ -582,7 +568,6 @@ public class FlinkTrackingTask {
         }
         log.info("flinkTrackingTask setOptioning");
         OPTIONING.put(appId, state);
-        //从 streampark 停止
         if (state.equals(OptionState.CANCELLING)) {
             STOP_FROM_MAP.put(appId, StopFrom.STREAMPARK);
         }
@@ -606,10 +591,10 @@ public class FlinkTrackingTask {
     }
 
     /**
-     * 重新加载最新的application到数据库,防止如修改等操作,导致cache和实际数据库中信息不一致的问题.
+     * Reload the latest application to the database to avoid the problem of inconsistency between the data of cache and database.
      *
-     * @param appId
-     * @param callable
+     * @param appId appId
+     * @param callable callable function
      */
     public static Object refreshTracking(Long appId, Callable callable) throws Exception {
         if (isKubernetesApp(appId)) {
@@ -654,20 +639,20 @@ public class FlinkTrackingTask {
         TRACKING_MAP.remove(appId);
     }
 
-    public static void stopCanlledJob(Long appId) {
+    public static void stopCanceledJob(Long appId) {
         if (!CANCELLED_JOB_MAP.containsKey(appId)) {
             return;
         }
-        log.info("flink job canlled app appId:{} by useId:{}", appId, CANCELLED_JOB_MAP.get(appId));
+        log.info("flink job canceled app appId:{} by useId:{}", appId, CANCELLED_JOB_MAP.get(appId));
         CANCELLED_JOB_MAP.remove(appId);
     }
 
-    public static void addCanlledApp(Long appId, Long userId) {
-        log.info("flink job addCanlledApp app appId:{}, useId:{}", appId, userId);
+    public static void addCanceledApp(Long appId, Long userId) {
+        log.info("flink job addCanceledApp app appId:{}, useId:{}", appId, userId);
         CANCELLED_JOB_MAP.put(appId, userId);
     }
 
-    public static Long getCanlledJobUserId(Long appId) {
+    public static Long getCanceledJobUserId(Long appId) {
         return CANCELLED_JOB_MAP.get(appId);
     }
 
@@ -732,9 +717,6 @@ public class FlinkTrackingTask {
                     reqURL = String.format(format, application.getJobManagerUrl());
                 }
                 return yarnRestRequest(reqURL, Overview.class);
-                // TODO: yarn-session
-                //String remoteUrl = getFlinkClusterRestUrl(flinkCluster, flinkUrl);
-                //return httpGetDoResult(remoteUrl, Overview.class);
             }
         }
         return null;
@@ -753,7 +735,6 @@ public class FlinkTrackingTask {
             }
             JobsOverview jobsOverview = yarnRestRequest(reqURL, JobsOverview.class);
             if (jobsOverview != null && ExecutionMode.YARN_SESSION.equals(application.getExecutionModeEnum())) {
-                //过滤出当前job
                 List<JobsOverview.Job> jobs = jobsOverview.getJobs().stream().filter(x -> x.getId().equals(application.getJobId())).collect(Collectors.toList());
                 jobsOverview.setJobs(jobs);
             }
@@ -763,7 +744,6 @@ public class FlinkTrackingTask {
                 String remoteUrl = flinkCluster.getActiveAddress().toURL() + "/" + flinkUrl;
                 JobsOverview jobsOverview = httpRestRequest(remoteUrl, JobsOverview.class);
                 if (jobsOverview != null) {
-                    //过滤出当前job
                     List<JobsOverview.Job> jobs = jobsOverview.getJobs().stream().filter(x -> x.getId().equals(application.getJobId())).collect(Collectors.toList());
                     jobsOverview.setJobs(jobs);
                 }