You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@streampark.apache.org by GitBox <gi...@apache.org> on 2022/11/26 03:40:33 UTC

[GitHub] [incubator-streampark] wolfboys opened a new pull request, #2100: [Bug][WIP] flink cluster management bug fixed

wolfboys opened a new pull request, #2100:
URL: https://github.com/apache/incubator-streampark/pull/2100

   [Bug][WIP] flink cluster management bug fixed
   
   ## What changes were proposed in this pull request
   
   1). add remote-cluster connection  check
   2) add yarn-session-cluster connection  check
   3) simplify add cluster parameter settings
   
   Issue Number: close #xxx <!-- REMOVE this line if no issue to close -->
   
   <!--(For example: This pull request proposed to add checkstyle plugin).-->
   
   ## Brief change log
   
   <!--*(for example:)*
   - *Add maven-checkstyle-plugin to root pom.xml*
   -->
   
   ## Verifying this change
   
   <!--*(Please pick either of the following options)*-->
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   <!--*(example:)*
   - *Added integration tests for end-to-end.*
   - *Added *Test to verify the change.*
   - *Manually verified the change by testing locally.* -->
   
   ## Does this pull request potentially affect one of the following parts
    - Dependencies (does it add or upgrade a dependency): no
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] MonsterChenzhuo merged pull request #2100: [Bug] flink cluster management bug fixed

Posted by GitBox <gi...@apache.org>.
MonsterChenzhuo merged PR #2100:
URL: https://github.com/apache/incubator-streampark/pull/2100


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] 1996fanrui commented on pull request #2100: [Bug] flink cluster management bug fixed

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on PR #2100:
URL: https://github.com/apache/incubator-streampark/pull/2100#issuecomment-1329209516

   > > cluster run status bug fixed
   > 
   > What's the bug? And please update the PR and commit message. It's better to write the bug info. Developer, reviewer and other contributor will be easy to know what the PR do.
   
   Hi @wolfboys , please help take a look, thanks~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] MonsterChenzhuo commented on pull request #2100: [Bug] flink cluster management bug fixed

Posted by GitBox <gi...@apache.org>.
MonsterChenzhuo commented on PR #2100:
URL: https://github.com/apache/incubator-streampark/pull/2100#issuecomment-1330838630

   LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] wolfboys commented on a diff in pull request #2100: [Bug] flink cluster management bug fixed

Posted by GitBox <gi...@apache.org>.
wolfboys commented on code in PR #2100:
URL: https://github.com/apache/incubator-streampark/pull/2100#discussion_r1033699293


##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java:
##########
@@ -138,23 +144,34 @@ public URI getActiveAddress() {
         return null;
     }
 
-    public boolean verifyConnection() {
-        if (address == null) {
-            return false;
-        }
-        String[] array = address.split(",");
-        for (String url : array) {
-            try {
-                new URI(url);
-            } catch (Exception ignored) {
+    public boolean verifyClusterConnection() {
+        if (ExecutionMode.REMOTE.equals(this.getExecutionModeEnum()) ||
+            ExecutionMode.YARN_SESSION.equals(this.getExecutionModeEnum())) {
+            if (address == null) {
                 return false;
             }
-            try {
-                HttpClientUtils.httpGetRequest(url, RequestConfig.custom().setConnectTimeout(2000).build());
-                return true;
-            } catch (Exception ignored) {
-                //
+            String[] array = address.split(",");
+            for (String url : array) {

Review Comment:
   > As I understand, yarn session cluster is a yarn job. It just has a JobManager and when old JM is crashed, yarn will start a new JM, right?
   > 
   > If so, I don't understand the HA address.
   
   I don't want to discuss this part too much, If you set a single url here, corresponding to a unit of yarn node, once the node hangs, the subsequent can not do other work.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] wolfboys commented on pull request #2100: [Bug][WIP] flink cluster management bug fixed

Posted by GitBox <gi...@apache.org>.
wolfboys commented on PR #2100:
URL: https://github.com/apache/incubator-streampark/pull/2100#issuecomment-1328157225

   cc @macksonmu @xujiangfeng001 PTAL thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] macksonmu commented on a diff in pull request #2100: [Bug] flink cluster management bug fixed

Posted by GitBox <gi...@apache.org>.
macksonmu commented on code in PR #2100:
URL: https://github.com/apache/incubator-streampark/pull/2100#discussion_r1033057938


##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java:
##########
@@ -57,4 +57,6 @@ public interface ApplicationMapper extends BaseMapper<Application> {
     Boolean existsByJobName(@Param("jobName") String jobName);
 
     List<Application> getByProjectId(@Param("projectId") Long id);
+
+    boolean existsRunningJobByClusterId(@Param("clusterId")Long clusterId);

Review Comment:
   missing space before `Long`



##########
streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml:
##########
@@ -98,6 +99,14 @@
         limit 1
     </select>
 
+    <select id="existsRunningJobByClusterId" resultType="java.lang.Boolean" parameterType="java.lang.Long">
+        select count(1)
+        from t_flink_app
+        where flink_cluster_id = #{clusterId}

Review Comment:
   I didn't understand, what is the difference between `cluster_id` and `flink_cluster_id`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] macksonmu commented on a diff in pull request #2100: [Bug] flink cluster management bug fixed

Posted by GitBox <gi...@apache.org>.
macksonmu commented on code in PR #2100:
URL: https://github.com/apache/incubator-streampark/pull/2100#discussion_r1033078241


##########
streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts:
##########
@@ -41,7 +41,6 @@ export default {
   podTemplate: 'Kubernetes Pod Template',
   flinkCluster: 'Flink Cluster',
   yarnQueue: 'Yarn Queue',
-  yarnSessionClusterId: 'Yarn Session ClusterId',

Review Comment:
   When adding an existing yarn session cluster, many people may not know what `address` and `Yarn Session Cluster` are. It is recommended to give a hint, for example, `address` is the jobmanager address, such as http://hh-hx199w:35154, `Yarn Session Cluster` Is a value similar to application_1656423003647_19237



##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java:
##########
@@ -138,23 +144,34 @@ public URI getActiveAddress() {
         return null;
     }
 
-    public boolean verifyConnection() {
-        if (address == null) {
-            return false;
-        }
-        String[] array = address.split(",");
-        for (String url : array) {
-            try {
-                new URI(url);
-            } catch (Exception ignored) {
+    public boolean verifyClusterConnection() {
+        if (ExecutionMode.REMOTE.equals(this.getExecutionModeEnum()) ||
+            ExecutionMode.YARN_SESSION.equals(this.getExecutionModeEnum())) {
+            if (address == null) {
                 return false;
             }
-            try {
-                HttpClientUtils.httpGetRequest(url, RequestConfig.custom().setConnectTimeout(2000).build());
-                return true;
-            } catch (Exception ignored) {
-                //
+            String[] array = address.split(",");
+            for (String url : array) {

Review Comment:
   Why is there a cycle here?



##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java:
##########
@@ -519,6 +529,21 @@ public boolean existsByTeamId(Long teamId) {
         return baseMapper.existsByTeamId(teamId);
     }
 
+    @Override
+    public boolean existsRunningJobByClusterId(Long clusterId) {
+        boolean exists = baseMapper.existsRunningJobByClusterId(clusterId);
+        if (!exists) {
+            for (Application application:FlinkTrackingTask.getAllTrackingApp().values()) {

Review Comment:
   no spaces around the colon



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] 1996fanrui commented on pull request #2100: [Bug] flink cluster management bug fixed

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on PR #2100:
URL: https://github.com/apache/incubator-streampark/pull/2100#issuecomment-1328934726

   > cluster run status bug fixed
   
   What's the bug? And please update the PR and commit message. It's better to write the bug info. Developer, reviewer and other contributor will be easy to know what the PR do.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] wolfboys commented on a diff in pull request #2100: [Bug] flink cluster management bug fixed

Posted by GitBox <gi...@apache.org>.
wolfboys commented on code in PR #2100:
URL: https://github.com/apache/incubator-streampark/pull/2100#discussion_r1033663781


##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java:
##########
@@ -138,23 +144,34 @@ public URI getActiveAddress() {
         return null;
     }
 
-    public boolean verifyConnection() {
-        if (address == null) {
-            return false;
-        }
-        String[] array = address.split(",");
-        for (String url : array) {
-            try {
-                new URI(url);
-            } catch (Exception ignored) {
+    public boolean verifyClusterConnection() {
+        if (ExecutionMode.REMOTE.equals(this.getExecutionModeEnum()) ||
+            ExecutionMode.YARN_SESSION.equals(this.getExecutionModeEnum())) {
+            if (address == null) {
                 return false;
             }
-            try {
-                HttpClientUtils.httpGetRequest(url, RequestConfig.custom().setConnectTimeout(2000).build());
-                return true;
-            } catch (Exception ignored) {
-                //
+            String[] array = address.split(",");
+            for (String url : array) {
+                try {
+                    new URI(url);
+                } catch (Exception ignored) {
+                    return false;
+                }
+                try {
+                    String restUrl;
+                    if (ExecutionMode.REMOTE.equals(this.getExecutionModeEnum())) {
+                        restUrl = url + "/overview";
+                    } else {
+                        restUrl = url + "/proxy/" + this.clusterId + "/overview";
+                    }
+                    String result = HttpClientUtils.httpGetRequest(restUrl, RequestConfig.custom().setConnectTimeout(2000).build());
+                    JacksonUtils.read(result, Overview.class);
+                    return true;
+                } catch (Exception ignored) {
+                    //
+                }
             }
+            return false;
         }
         return false;

Review Comment:
   > From your reply, I think they can be simplified . I still don't understand why don't simplify them.
   
      e.g: address is : http://192.168.0.1,xxyy:a.b.c,192.168.1.2
   
     ```
       String[] array = address.split(",");
               for (String url : array) {
                   try {
                       new URI(url);
                   } catch (Exception ignored) {
                       return false;
                   }
                   try {
                       String restUrl;
                       if (ExecutionMode.REMOTE.equals(this.getExecutionModeEnum())) {
                           restUrl = url + "/overview";
                       } else {
                           restUrl = url + "/proxy/" + this.clusterId + "/overview";
                       }
                       String result = HttpClientUtils.httpGetRequest(restUrl, RequestConfig.custom().setConnectTimeout(2000).build());
                       JacksonUtils.read(result, Overview.class);
                       return true;
                   } catch (Exception ignored) {
                       //
                   }
               }
   ```
   
   then parse `xxyy:a.b.c` first try..catch will return, second will find only one if connection is ok and return.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] wolfboys commented on a diff in pull request #2100: [Bug] flink cluster management bug fixed

Posted by GitBox <gi...@apache.org>.
wolfboys commented on code in PR #2100:
URL: https://github.com/apache/incubator-streampark/pull/2100#discussion_r1033497510


##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java:
##########
@@ -138,23 +144,34 @@ public URI getActiveAddress() {
         return null;
     }
 
-    public boolean verifyConnection() {
-        if (address == null) {
-            return false;
-        }
-        String[] array = address.split(",");
-        for (String url : array) {
-            try {
-                new URI(url);
-            } catch (Exception ignored) {
+    public boolean verifyClusterConnection() {
+        if (ExecutionMode.REMOTE.equals(this.getExecutionModeEnum()) ||
+            ExecutionMode.YARN_SESSION.equals(this.getExecutionModeEnum())) {
+            if (address == null) {
                 return false;
             }
-            try {
-                HttpClientUtils.httpGetRequest(url, RequestConfig.custom().setConnectTimeout(2000).build());
-                return true;
-            } catch (Exception ignored) {
-                //
+            String[] array = address.split(",");
+            for (String url : array) {

Review Comment:
   > Same question: Isn't the address JM url?
   > 
   > If the address is JM url, why is it a list?
   
   e.g:
   1): cluster is  flink standalone cluster, is a set of addresses
   2): cluster is yarn-session cluster, is yarn HA addresses



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] wolfboys commented on a diff in pull request #2100: [Bug] flink cluster management bug fixed

Posted by GitBox <gi...@apache.org>.
wolfboys commented on code in PR #2100:
URL: https://github.com/apache/incubator-streampark/pull/2100#discussion_r1033676148


##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java:
##########
@@ -519,6 +529,26 @@ public boolean existsByTeamId(Long teamId) {
         return baseMapper.existsByTeamId(teamId);
     }
 
+    @Override
+    public boolean existsRunningJobByClusterId(Long clusterId) {
+        boolean exists = baseMapper.existsRunningJobByClusterId(clusterId);
+        if (!exists) {
+            for (Application application : FlinkTrackingTask.getAllTrackingApp().values()) {
+                if (clusterId.equals(application.getFlinkClusterId())) {
+                    if (FlinkAppState.RUNNING.equals(application.getFlinkAppStateEnum())) {
+                        return true;
+                    }
+                }
+            }
+        }
+        return exists;
+    }

Review Comment:
   > This is bad case about code style. You can take a look Flink Code style in your free time, it's useful for high quality code of StreamPark.
   > 
   > https://flink.apache.org/contributing/code-style-and-quality-common.html
   > 
   > <img alt="image" width="1032" src="https://user-images.githubusercontent.com/38427477/204298610-923c0359-d9d4-4f49-9718-8a15da59bf1a.png">
   > 
   > <img alt="image" width="733" src="https://user-images.githubusercontent.com/38427477/204299004-b44d768f-cfba-4c96-b861-19d11fd97d5a.png">
   
   hi fanrui:
   
   Thanks for your feedback. If you read the code carefully, you will find that the situation is different from what you describe
   
   the following code is improved:
   
   ```
   @Override
       public boolean existsRunningJobByClusterId(Long clusterId) {
           boolean exists = baseMapper.existsRunningJobByClusterId(clusterId);
           if (!exists) {
               for (Application application : FlinkTrackingTask.getAllTrackingApp().values()) {
   
                   if (clusterId.equals(application.getFlinkClusterId()) && FlinkAppState.RUNNING.equals(application.getFlinkAppStateEnum()) ) {
                           return true;
                   }
               }
           }
           return exists;
       }
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] 1996fanrui commented on a diff in pull request #2100: [Bug] flink cluster management bug fixed

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #2100:
URL: https://github.com/apache/incubator-streampark/pull/2100#discussion_r1033441186


##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java:
##########
@@ -138,23 +144,34 @@ public URI getActiveAddress() {
         return null;
     }
 
-    public boolean verifyConnection() {
-        if (address == null) {
-            return false;
-        }
-        String[] array = address.split(",");
-        for (String url : array) {
-            try {
-                new URI(url);
-            } catch (Exception ignored) {
+    public boolean verifyClusterConnection() {
+        if (ExecutionMode.REMOTE.equals(this.getExecutionModeEnum()) ||
+            ExecutionMode.YARN_SESSION.equals(this.getExecutionModeEnum())) {
+            if (address == null) {
                 return false;
             }
-            try {
-                HttpClientUtils.httpGetRequest(url, RequestConfig.custom().setConnectTimeout(2000).build());
-                return true;
-            } catch (Exception ignored) {
-                //
+            String[] array = address.split(",");
+            for (String url : array) {
+                try {
+                    new URI(url);
+                } catch (Exception ignored) {
+                    return false;
+                }
+                try {
+                    String restUrl;
+                    if (ExecutionMode.REMOTE.equals(this.getExecutionModeEnum())) {
+                        restUrl = url + "/overview";
+                    } else {
+                        restUrl = url + "/proxy/" + this.clusterId + "/overview";
+                    }
+                    String result = HttpClientUtils.httpGetRequest(restUrl, RequestConfig.custom().setConnectTimeout(2000).build());
+                    JacksonUtils.read(result, Overview.class);
+                    return true;
+                } catch (Exception ignored) {
+                    //
+                }

Review Comment:
   Can 2 `try ... catch` be simplified to one? 



##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java:
##########
@@ -138,23 +144,34 @@ public URI getActiveAddress() {
         return null;
     }
 
-    public boolean verifyConnection() {
-        if (address == null) {
-            return false;
-        }
-        String[] array = address.split(",");
-        for (String url : array) {
-            try {
-                new URI(url);
-            } catch (Exception ignored) {
+    public boolean verifyClusterConnection() {
+        if (ExecutionMode.REMOTE.equals(this.getExecutionModeEnum()) ||
+            ExecutionMode.YARN_SESSION.equals(this.getExecutionModeEnum())) {
+            if (address == null) {
                 return false;
             }
-            try {
-                HttpClientUtils.httpGetRequest(url, RequestConfig.custom().setConnectTimeout(2000).build());
-                return true;
-            } catch (Exception ignored) {
-                //
+            String[] array = address.split(",");
+            for (String url : array) {

Review Comment:
   Same question: Isn't the address JM url? 
   
   If the address is JM url, why is it a list?



##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java:
##########
@@ -138,23 +144,34 @@ public URI getActiveAddress() {
         return null;
     }
 
-    public boolean verifyConnection() {
-        if (address == null) {
-            return false;
-        }
-        String[] array = address.split(",");
-        for (String url : array) {
-            try {
-                new URI(url);
-            } catch (Exception ignored) {
+    public boolean verifyClusterConnection() {
+        if (ExecutionMode.REMOTE.equals(this.getExecutionModeEnum()) ||
+            ExecutionMode.YARN_SESSION.equals(this.getExecutionModeEnum())) {
+            if (address == null) {
                 return false;
             }
-            try {
-                HttpClientUtils.httpGetRequest(url, RequestConfig.custom().setConnectTimeout(2000).build());
-                return true;
-            } catch (Exception ignored) {
-                //
+            String[] array = address.split(",");
+            for (String url : array) {
+                try {
+                    new URI(url);
+                } catch (Exception ignored) {
+                    return false;
+                }
+                try {
+                    String restUrl;
+                    if (ExecutionMode.REMOTE.equals(this.getExecutionModeEnum())) {
+                        restUrl = url + "/overview";
+                    } else {
+                        restUrl = url + "/proxy/" + this.clusterId + "/overview";
+                    }
+                    String result = HttpClientUtils.httpGetRequest(restUrl, RequestConfig.custom().setConnectTimeout(2000).build());
+                    JacksonUtils.read(result, Overview.class);
+                    return true;
+                } catch (Exception ignored) {
+                    //
+                }
             }
+            return false;
         }
         return false;

Review Comment:
   Can 3 `return false` be simplified to one?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] wolfboys commented on a diff in pull request #2100: [Bug] flink cluster management bug fixed

Posted by GitBox <gi...@apache.org>.
wolfboys commented on code in PR #2100:
URL: https://github.com/apache/incubator-streampark/pull/2100#discussion_r1033083311


##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java:
##########
@@ -138,23 +144,34 @@ public URI getActiveAddress() {
         return null;
     }
 
-    public boolean verifyConnection() {
-        if (address == null) {
-            return false;
-        }
-        String[] array = address.split(",");
-        for (String url : array) {
-            try {
-                new URI(url);
-            } catch (Exception ignored) {
+    public boolean verifyClusterConnection() {
+        if (ExecutionMode.REMOTE.equals(this.getExecutionModeEnum()) ||
+            ExecutionMode.YARN_SESSION.equals(this.getExecutionModeEnum())) {
+            if (address == null) {
                 return false;
             }
-            try {
-                HttpClientUtils.httpGetRequest(url, RequestConfig.custom().setConnectTimeout(2000).build());
-                return true;
-            } catch (Exception ignored) {
-                //
+            String[] array = address.split(",");
+            for (String url : array) {

Review Comment:
   > Why is there a cycle here?
   
   For example,   the address may be a set of addresses, such as: 192.168.0.1, 192.168.0.2, 192.168.0.3



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] wolfboys commented on a diff in pull request #2100: [Bug] flink cluster management bug fixed

Posted by GitBox <gi...@apache.org>.
wolfboys commented on code in PR #2100:
URL: https://github.com/apache/incubator-streampark/pull/2100#discussion_r1033084078


##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java:
##########
@@ -519,6 +529,21 @@ public boolean existsByTeamId(Long teamId) {
         return baseMapper.existsByTeamId(teamId);
     }
 
+    @Override
+    public boolean existsRunningJobByClusterId(Long clusterId) {
+        boolean exists = baseMapper.existsRunningJobByClusterId(clusterId);
+        if (!exists) {
+            for (Application application:FlinkTrackingTask.getAllTrackingApp().values()) {

Review Comment:
   > no spaces around the colon
   
   Thanks for your review, I will improve it later



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] wolfboys commented on a diff in pull request #2100: [Bug] flink cluster management bug fixed

Posted by GitBox <gi...@apache.org>.
wolfboys commented on code in PR #2100:
URL: https://github.com/apache/incubator-streampark/pull/2100#discussion_r1033665925


##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java:
##########
@@ -138,23 +144,34 @@ public URI getActiveAddress() {
         return null;
     }
 
-    public boolean verifyConnection() {
-        if (address == null) {
-            return false;
-        }
-        String[] array = address.split(",");
-        for (String url : array) {
-            try {
-                new URI(url);
-            } catch (Exception ignored) {
+    public boolean verifyClusterConnection() {
+        if (ExecutionMode.REMOTE.equals(this.getExecutionModeEnum()) ||
+            ExecutionMode.YARN_SESSION.equals(this.getExecutionModeEnum())) {
+            if (address == null) {
                 return false;
             }
-            try {
-                HttpClientUtils.httpGetRequest(url, RequestConfig.custom().setConnectTimeout(2000).build());
-                return true;
-            } catch (Exception ignored) {
-                //
+            String[] array = address.split(",");
+            for (String url : array) {
+                try {
+                    new URI(url);
+                } catch (Exception ignored) {
+                    return false;
+                }
+                try {
+                    String restUrl;
+                    if (ExecutionMode.REMOTE.equals(this.getExecutionModeEnum())) {
+                        restUrl = url + "/overview";
+                    } else {
+                        restUrl = url + "/proxy/" + this.clusterId + "/overview";
+                    }
+                    String result = HttpClientUtils.httpGetRequest(restUrl, RequestConfig.custom().setConnectTimeout(2000).build());
+                    JacksonUtils.read(result, Overview.class);
+                    return true;
+                } catch (Exception ignored) {
+                    //
+                }
             }
+            return false;
         }
         return false;

Review Comment:
   > > From your reply, I think they can be simplified . I still don't understand why don't simplify them.
   > 
   > e.g: address is : http://192.168.0.1,xxyy:a.b.c,192.168.1.2
   > 
   > ```
   >   String[] array = address.split(",");
   >           for (String url : array) {
   >               try {
   >                   new URI(url);
   >               } catch (Exception ignored) {
   >                   return false;
   >               }
   >               try {
   >                   String restUrl;
   >                   if (ExecutionMode.REMOTE.equals(this.getExecutionModeEnum())) {
   >                       restUrl = url + "/overview";
   >                   } else {
   >                       restUrl = url + "/proxy/" + this.clusterId + "/overview";
   >                   }
   >                   String result = HttpClientUtils.httpGetRequest(restUrl, RequestConfig.custom().setConnectTimeout(2000).build());
   >                   JacksonUtils.read(result, Overview.class);
   >                   return true;
   >               } catch (Exception ignored) {
   >                   //
   >               }
   >           }
   > ```
   > 
   > then parse `xxyy:a.b.c` first try..catch will return, second will find only one if connection is ok and return.
   
   first try...catch Will check whether each url is ok, but the second is not



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] wolfboys commented on pull request #2100: [Bug] flink cluster management bug fixed

Posted by GitBox <gi...@apache.org>.
wolfboys commented on PR #2100:
URL: https://github.com/apache/incubator-streampark/pull/2100#issuecomment-1329292058

   > 
   
   1). add remote-cluster connection check
   2) add yarn-session-cluster connection check
   3) simplify add cluster parameter settings
   4) cluster run status bug fixed
   
   issue: #2091, #2092 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] wolfboys commented on a diff in pull request #2100: [Bug] flink cluster management bug fixed

Posted by GitBox <gi...@apache.org>.
wolfboys commented on code in PR #2100:
URL: https://github.com/apache/incubator-streampark/pull/2100#discussion_r1033068455


##########
streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml:
##########
@@ -98,6 +99,14 @@
         limit 1
     </select>
 
+    <select id="existsRunningJobByClusterId" resultType="java.lang.Boolean" parameterType="java.lang.Long">
+        select count(1)
+        from t_flink_app
+        where flink_cluster_id = #{clusterId}

Review Comment:
   > I didn't understand, what is the difference between `cluster_id` and `flink_cluster_id`
   
   cluster_id: Such as the applicationId of the yarn-session and the k8s cluster Id
   flink_cluster_id: Clusters managed by streampark, from t_flink_cluster id field
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] 1996fanrui commented on a diff in pull request #2100: [Bug] flink cluster management bug fixed

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #2100:
URL: https://github.com/apache/incubator-streampark/pull/2100#discussion_r1033586457


##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java:
##########
@@ -138,23 +144,34 @@ public URI getActiveAddress() {
         return null;
     }
 
-    public boolean verifyConnection() {
-        if (address == null) {
-            return false;
-        }
-        String[] array = address.split(",");
-        for (String url : array) {
-            try {
-                new URI(url);
-            } catch (Exception ignored) {
+    public boolean verifyClusterConnection() {
+        if (ExecutionMode.REMOTE.equals(this.getExecutionModeEnum()) ||
+            ExecutionMode.YARN_SESSION.equals(this.getExecutionModeEnum())) {
+            if (address == null) {
                 return false;
             }
-            try {
-                HttpClientUtils.httpGetRequest(url, RequestConfig.custom().setConnectTimeout(2000).build());
-                return true;
-            } catch (Exception ignored) {
-                //
+            String[] array = address.split(",");
+            for (String url : array) {

Review Comment:
   As I understand, yarn session cluster is a yarn job. It just has a JobManager and when old JM is crashed, yarn will start a new JM, right?
   
   If so, I don't understand the HA address.



##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java:
##########
@@ -519,6 +529,26 @@ public boolean existsByTeamId(Long teamId) {
         return baseMapper.existsByTeamId(teamId);
     }
 
+    @Override
+    public boolean existsRunningJobByClusterId(Long clusterId) {
+        boolean exists = baseMapper.existsRunningJobByClusterId(clusterId);
+        if (!exists) {
+            for (Application application : FlinkTrackingTask.getAllTrackingApp().values()) {
+                if (clusterId.equals(application.getFlinkClusterId())) {
+                    if (FlinkAppState.RUNNING.equals(application.getFlinkAppStateEnum())) {
+                        return true;
+                    }
+                }
+            }
+        }
+        return exists;
+    }

Review Comment:
   This is bad case about code style. You can take a look Flink Code style in your free time, it's useful for  high quality code of StreamPark.
   
   https://flink.apache.org/contributing/code-style-and-quality-common.html
   
   <img width="1032" alt="image" src="https://user-images.githubusercontent.com/38427477/204298610-923c0359-d9d4-4f49-9718-8a15da59bf1a.png">
   
   <img width="733" alt="image" src="https://user-images.githubusercontent.com/38427477/204299004-b44d768f-cfba-4c96-b861-19d11fd97d5a.png">
   
   



##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java:
##########
@@ -138,23 +144,34 @@ public URI getActiveAddress() {
         return null;
     }
 
-    public boolean verifyConnection() {
-        if (address == null) {
-            return false;
-        }
-        String[] array = address.split(",");
-        for (String url : array) {
-            try {
-                new URI(url);
-            } catch (Exception ignored) {
+    public boolean verifyClusterConnection() {
+        if (ExecutionMode.REMOTE.equals(this.getExecutionModeEnum()) ||
+            ExecutionMode.YARN_SESSION.equals(this.getExecutionModeEnum())) {
+            if (address == null) {
                 return false;
             }
-            try {
-                HttpClientUtils.httpGetRequest(url, RequestConfig.custom().setConnectTimeout(2000).build());
-                return true;
-            } catch (Exception ignored) {
-                //
+            String[] array = address.split(",");
+            for (String url : array) {
+                try {
+                    new URI(url);
+                } catch (Exception ignored) {
+                    return false;
+                }
+                try {
+                    String restUrl;
+                    if (ExecutionMode.REMOTE.equals(this.getExecutionModeEnum())) {
+                        restUrl = url + "/overview";
+                    } else {
+                        restUrl = url + "/proxy/" + this.clusterId + "/overview";
+                    }
+                    String result = HttpClientUtils.httpGetRequest(restUrl, RequestConfig.custom().setConnectTimeout(2000).build());
+                    JacksonUtils.read(result, Overview.class);
+                    return true;
+                } catch (Exception ignored) {
+                    //
+                }
             }
+            return false;
         }
         return false;

Review Comment:
   From your reply, I think they can be simplified . I still don't understand why don't simplify them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] 1996fanrui commented on a diff in pull request #2100: [Bug] flink cluster management bug fixed

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #2100:
URL: https://github.com/apache/incubator-streampark/pull/2100#discussion_r1033606592


##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java:
##########
@@ -164,11 +181,14 @@ public Map<String, String> getFlinkConfig() throws MalformedURLException, JsonPr
         URI activeAddress = this.getActiveAddress();
         String restUrl = activeAddress.toURL() + "/jobmanager/config";
         String json = HttpClientUtils.httpGetRequest(restUrl, RequestConfig.custom().setConnectTimeout(2000).build());
-        List<Map<String, String>> confList = JacksonUtils.read(json, new TypeReference<List<Map<String, String>>>() {
-        });
-        Map<String, String> config = new HashMap<>(0);
-        confList.forEach(k -> config.put(k.get("key"), k.get("value")));
-        return config;
+        if (StringUtils.isNotEmpty(json)) {
+            List<Map<String, String>> confList = JacksonUtils.read(json, new TypeReference<List<Map<String, String>>>() {
+            });
+            Map<String, String> config = new HashMap<>(0);
+            confList.forEach(k -> config.put(k.get("key"), k.get("value")));
+            return config;
+        }
+        return Collections.emptyMap();

Review Comment:
   ```suggestion
           if (StringUtils.isEmpty(json)) {
               return Collections.emptyMap();
           }
           
           List<Map<String, String>> confList = JacksonUtils.read(json, new TypeReference<List<Map<String, String>>>() {
               });
           Map<String, String> config = new HashMap<>(0);
           confList.forEach(k -> config.put(k.get("key"), k.get("value")));
           return config;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] macksonmu commented on a diff in pull request #2100: [Bug] flink cluster management bug fixed

Posted by GitBox <gi...@apache.org>.
macksonmu commented on code in PR #2100:
URL: https://github.com/apache/incubator-streampark/pull/2100#discussion_r1033078927


##########
streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml:
##########
@@ -98,6 +99,14 @@
         limit 1
     </select>
 
+    <select id="existsRunningJobByClusterId" resultType="java.lang.Boolean" parameterType="java.lang.Long">
+        select count(1)
+        from t_flink_app
+        where flink_cluster_id = #{clusterId}

Review Comment:
   ok i got it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] wolfboys commented on a diff in pull request #2100: [Bug] flink cluster management bug fixed

Posted by GitBox <gi...@apache.org>.
wolfboys commented on code in PR #2100:
URL: https://github.com/apache/incubator-streampark/pull/2100#discussion_r1033504187


##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java:
##########
@@ -138,23 +144,34 @@ public URI getActiveAddress() {
         return null;
     }
 
-    public boolean verifyConnection() {
-        if (address == null) {
-            return false;
-        }
-        String[] array = address.split(",");
-        for (String url : array) {
-            try {
-                new URI(url);
-            } catch (Exception ignored) {
+    public boolean verifyClusterConnection() {
+        if (ExecutionMode.REMOTE.equals(this.getExecutionModeEnum()) ||
+            ExecutionMode.YARN_SESSION.equals(this.getExecutionModeEnum())) {
+            if (address == null) {
                 return false;
             }
-            try {
-                HttpClientUtils.httpGetRequest(url, RequestConfig.custom().setConnectTimeout(2000).build());
-                return true;
-            } catch (Exception ignored) {
-                //
+            String[] array = address.split(",");
+            for (String url : array) {
+                try {
+                    new URI(url);
+                } catch (Exception ignored) {
+                    return false;
+                }
+                try {
+                    String restUrl;
+                    if (ExecutionMode.REMOTE.equals(this.getExecutionModeEnum())) {
+                        restUrl = url + "/overview";
+                    } else {
+                        restUrl = url + "/proxy/" + this.clusterId + "/overview";
+                    }
+                    String result = HttpClientUtils.httpGetRequest(restUrl, RequestConfig.custom().setConnectTimeout(2000).build());
+                    JacksonUtils.read(result, Overview.class);
+                    return true;
+                } catch (Exception ignored) {
+                    //
+                }
             }
+            return false;
         }
         return false;

Review Comment:
   > 
   
   The first `try..catch` is to ensure that a group of addresses, each is a legitimate url, if any of them is an invalid url then return,
   The second `try...catch` is to find an active and connected address in a group of addresses



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] wolfboys commented on pull request #2100: [Bug] flink cluster management bug fixed

Posted by GitBox <gi...@apache.org>.
wolfboys commented on PR #2100:
URL: https://github.com/apache/incubator-streampark/pull/2100#issuecomment-1329366124

   cc @1996fanrui:
   
   Thanks for your review and great comments, the latest code has been submitted. Please review again when you have time, thanks very much


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] 1996fanrui commented on a diff in pull request #2100: [Bug] flink cluster management bug fixed

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #2100:
URL: https://github.com/apache/incubator-streampark/pull/2100#discussion_r1033684545


##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java:
##########
@@ -519,6 +529,26 @@ public boolean existsByTeamId(Long teamId) {
         return baseMapper.existsByTeamId(teamId);
     }
 
+    @Override
+    public boolean existsRunningJobByClusterId(Long clusterId) {
+        boolean exists = baseMapper.existsRunningJobByClusterId(clusterId);
+        if (!exists) {
+            for (Application application : FlinkTrackingTask.getAllTrackingApp().values()) {
+                if (clusterId.equals(application.getFlinkClusterId())) {
+                    if (FlinkAppState.RUNNING.equals(application.getFlinkAppStateEnum())) {
+                        return true;
+                    }
+                }
+            }
+        }
+        return exists;
+    }

Review Comment:
   How about this?
   
   ```
       @Override
       public boolean existsRunningJobByClusterId(Long clusterId) {
           boolean exists = baseMapper.existsRunningJobByClusterId(clusterId);
           if (exists) {
               return true;
           }
   
           for (Application application : FlinkTrackingTask.getAllTrackingApp().values()) {
               if (clusterId.equals(application.getFlinkClusterId()) && FlinkAppState.RUNNING.equals(application.getFlinkAppStateEnum()) ) {
                   return true;
               }
           }
           return false;
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] wolfboys commented on a diff in pull request #2100: [Bug] flink cluster management bug fixed

Posted by GitBox <gi...@apache.org>.
wolfboys commented on code in PR #2100:
URL: https://github.com/apache/incubator-streampark/pull/2100#discussion_r1033701303


##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java:
##########
@@ -138,23 +144,34 @@ public URI getActiveAddress() {
         return null;
     }
 
-    public boolean verifyConnection() {
-        if (address == null) {
-            return false;
-        }
-        String[] array = address.split(",");
-        for (String url : array) {
-            try {
-                new URI(url);
-            } catch (Exception ignored) {
+    public boolean verifyClusterConnection() {
+        if (ExecutionMode.REMOTE.equals(this.getExecutionModeEnum()) ||
+            ExecutionMode.YARN_SESSION.equals(this.getExecutionModeEnum())) {
+            if (address == null) {
                 return false;
             }
-            try {
-                HttpClientUtils.httpGetRequest(url, RequestConfig.custom().setConnectTimeout(2000).build());
-                return true;
-            } catch (Exception ignored) {
-                //
+            String[] array = address.split(",");
+            for (String url : array) {
+                try {
+                    new URI(url);
+                } catch (Exception ignored) {
+                    return false;
+                }
+                try {
+                    String restUrl;
+                    if (ExecutionMode.REMOTE.equals(this.getExecutionModeEnum())) {
+                        restUrl = url + "/overview";
+                    } else {
+                        restUrl = url + "/proxy/" + this.clusterId + "/overview";
+                    }
+                    String result = HttpClientUtils.httpGetRequest(restUrl, RequestConfig.custom().setConnectTimeout(2000).build());
+                    JacksonUtils.read(result, Overview.class);
+                    return true;
+                } catch (Exception ignored) {
+                    //
+                }
             }
+            return false;
         }
         return false;

Review Comment:
   > After read this code 3 times, I think I understand.
   > 
   > The first `try catch` will return directly when any url isn't legal, right? For example, the url isn't legal, the next url won't be check.
   > 
   > If right, I think the code should be refactored. It's very very hard to read, I read it too many times to understand it. 😭
   > 
   > How about this? The `isLegalUrl` is an util method.
   > 
   > ```
   > for (String url : array) {
   >   if(!isLegalUrl(url)) {
   >     return false;
   >   }
   >   // try to connect yarn ...
   > }
   > ```
   > 
   > If It's still wrong, please correct me.
   
   ok, good. 👍



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] 1996fanrui commented on a diff in pull request #2100: [Bug] flink cluster management bug fixed

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #2100:
URL: https://github.com/apache/incubator-streampark/pull/2100#discussion_r1033709051


##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java:
##########
@@ -138,23 +144,34 @@ public URI getActiveAddress() {
         return null;
     }
 
-    public boolean verifyConnection() {
-        if (address == null) {
-            return false;
-        }
-        String[] array = address.split(",");
-        for (String url : array) {
-            try {
-                new URI(url);
-            } catch (Exception ignored) {
+    public boolean verifyClusterConnection() {
+        if (ExecutionMode.REMOTE.equals(this.getExecutionModeEnum()) ||
+            ExecutionMode.YARN_SESSION.equals(this.getExecutionModeEnum())) {
+            if (address == null) {
                 return false;
             }
-            try {
-                HttpClientUtils.httpGetRequest(url, RequestConfig.custom().setConnectTimeout(2000).build());
-                return true;
-            } catch (Exception ignored) {
-                //
+            String[] array = address.split(",");
+            for (String url : array) {

Review Comment:
   >  It just has a JobManager and when old JM is crashed, yarn will start a new JM, right?
   
   As I understand, yarn will start a new JM after old JM crashed. So I wonder how to get the HA address. Maybe I missed some background.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] wolfboys commented on a diff in pull request #2100: [Bug] flink cluster management bug fixed

Posted by GitBox <gi...@apache.org>.
wolfboys commented on code in PR #2100:
URL: https://github.com/apache/incubator-streampark/pull/2100#discussion_r1033747386


##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java:
##########
@@ -519,6 +529,26 @@ public boolean existsByTeamId(Long teamId) {
         return baseMapper.existsByTeamId(teamId);
     }
 
+    @Override
+    public boolean existsRunningJobByClusterId(Long clusterId) {
+        boolean exists = baseMapper.existsRunningJobByClusterId(clusterId);
+        if (!exists) {
+            for (Application application : FlinkTrackingTask.getAllTrackingApp().values()) {
+                if (clusterId.equals(application.getFlinkClusterId())) {
+                    if (FlinkAppState.RUNNING.equals(application.getFlinkAppStateEnum())) {
+                        return true;
+                    }
+                }
+            }
+        }
+        return exists;
+    }

Review Comment:
   > How about this?
   > 
   > ```
   >     @Override
   >     public boolean existsRunningJobByClusterId(Long clusterId) {
   >         boolean exists = baseMapper.existsRunningJobByClusterId(clusterId);
   >         if (exists) {
   >             return true;
   >         }
   > 
   >         for (Application application : FlinkTrackingTask.getAllTrackingApp().values()) {
   >             if (clusterId.equals(application.getFlinkClusterId()) && FlinkAppState.RUNNING.equals(application.getFlinkAppStateEnum()) ) {
   >                 return true;
   >             }
   >         }
   >         return false;
   >     }
   > ```
   
   I also submitted, it seems that the code is exactly the same as this 😅



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] wolfboys commented on pull request #2100: [Bug] flink cluster management bug fixed

Posted by GitBox <gi...@apache.org>.
wolfboys commented on PR #2100:
URL: https://github.com/apache/incubator-streampark/pull/2100#issuecomment-1329292859

   > 
   
   
   
   > > > cluster run status bug fixed
   > > 
   > > 
   > > What's the bug? And please update the PR and commit message. It's better to write the bug info. Developer, reviewer and other contributor will be easy to know what the PR do.
   > 
   > Hi @wolfboys , please help take a look, thanks~
   
   1). add remote-cluster connection check
   2) add yarn-session-cluster connection check
   3) simplify add cluster parameter settings
   4) cluster run status bug fixed
   
   issue: https://github.com/apache/incubator-streampark/issues/2091, https://github.com/apache/incubator-streampark/issues/2092


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org