You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by be...@apache.org on 2022/09/14 13:16:21 UTC

[incubator-streampark] branch dev updated: [improve] In the on yarn mode, when there are too many jobs, the numb… (#1602)

This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new a7a3872e5 [improve] In the on yarn mode, when there are too many jobs, the numb… (#1602)
a7a3872e5 is described below

commit a7a3872e5e700ad3e3b9be259105caba6bf1f079
Author: macksonmu <30...@qq.com>
AuthorDate: Wed Sep 14 21:16:13 2022 +0800

    [improve] In the on yarn mode, when there are too many jobs, the numb… (#1602)
    
    * [improve] In the on yarn mode, when there are too many jobs, the number of ResourceManager connections is too high #1563
    
    Co-authored-by: mucj7 <mu...@chinaunicom.cn>
---
 .../apache/streampark/common/util/YarnUtils.scala  |  1 +
 .../src/assembly/script/data/mysql-data.sql        |  2 +-
 .../src/assembly/script/data/pgsql-data.sql        |  2 +-
 .../src/assembly/script/schema/mysql-schema.sql    |  2 ++
 .../src/assembly/script/schema/pgsql-schema.sql    |  2 ++
 .../src/assembly/script/upgrade/mysql-upgrade.sql  |  4 +++
 .../console/core/entity/Application.java           |  6 +++++
 .../console/core/entity/ApplicationLog.java        |  4 +++
 .../core/service/impl/ApplicationServiceImpl.java  |  4 +++
 .../console/core/task/FlinkTrackingTask.java       | 31 +++++++++++++++++-----
 .../resources/mapper/core/ApplicationLogMapper.xml |  1 +
 .../resources/mapper/core/ApplicationMapper.xml    |  1 +
 .../src/views/flink/app/Detail.vue                 | 17 +++++++++++-
 .../flink/submit/bean/SubmitResponse.scala         |  3 ++-
 .../flink/submit/impl/YarnApplicationSubmit.scala  |  4 ++-
 .../flink/submit/impl/YarnPerJobSubmit.scala       |  3 ++-
 16 files changed, 74 insertions(+), 13 deletions(-)

diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
index 5688d498d..60f1540b6 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
@@ -232,6 +232,7 @@ object YarnUtils extends Logger {
     if (url == null) return null
 
     def request(url: String): String = {
+      logDebug("request url is " + url);
       val config = RequestConfig.custom.setConnectTimeout(5000).build
       if (hasYarnHttpKerberosAuth) {
         HadoopUtils.getUgi().doAs(new PrivilegedExceptionAction[String] {
diff --git a/streampark-console/streampark-console-service/src/assembly/script/data/mysql-data.sql b/streampark-console/streampark-console-service/src/assembly/script/data/mysql-data.sql
index 156f07aec..ae017f929 100644
--- a/streampark-console/streampark-console-service/src/assembly/script/data/mysql-data.sql
+++ b/streampark-console/streampark-console-service/src/assembly/script/data/mysql-data.sql
@@ -21,7 +21,7 @@ set foreign_key_checks = 0;
 -- ----------------------------
 -- Records of t_flink_app
 -- ----------------------------
-insert into `t_flink_app` values (100000, 2, 4, null, null, 'Flink SQL Demo', null, null, null, null, null, null , null, 100000, null, 1, null, null, null, null, null, null, 0, 0, null, null, null, null, null, null, 'Flink SQL Demo', 0, null, 0, null, null, null, null, null, null, 0, 0, now(), now(), null, 1, 1, null, null, null, null, null, null, 0, null, null, null, 'streampark,test');
+insert into `t_flink_app` values (100000, 2, 4, null, null, 'Flink SQL Demo', null, null, null, null, null, null , null, 100000, null, 1, null, null, null, null, null, null, null, 0, 0, null, null, null, null, null, null, 'Flink SQL Demo', 0, null, 0, null, null, null, null, null, null, 0, 0, now(), now(), null, 1, 1, null, null, null, null, null, null, 0, null, null, null, 'streampark,test');
 
 -- ----------------------------
 -- Records of t_flink_effective
diff --git a/streampark-console/streampark-console-service/src/assembly/script/data/pgsql-data.sql b/streampark-console/streampark-console-service/src/assembly/script/data/pgsql-data.sql
index 83d1a3fb8..6848ae5c7 100644
--- a/streampark-console/streampark-console-service/src/assembly/script/data/pgsql-data.sql
+++ b/streampark-console/streampark-console-service/src/assembly/script/data/pgsql-data.sql
@@ -18,7 +18,7 @@
 -- ----------------------------
 -- records of t_flink_app
 -- ----------------------------
-insert into "public"."t_flink_app" values (100000, 2, 4, null, null, 'Flink SQL Demo', null, null, null, null, null, null , null, 100000, null, 1, null, null, null, null, null, null, 0, 0, null, null, null, null, null, null, 'Flink SQL Demo', 0, null, false, null, null, null, null, null, null, 0, 0, now(), now(), null, 1, true, null, null, null, null, null, null, false, null, null, null, 'streampark,test');
+insert into "public"."t_flink_app" values (100000, 2, 4, null, null, 'Flink SQL Demo', null, null, null, null, null, null , null, 100000, null, 1, null, null, null, null, null, null, null, 0, 0, null, null, null, null, null, null, 'Flink SQL Demo', 0, null, false, null, null, null, null, null, null, 0, 0, now(), now(), null, 1, true, null, null, null, null, null, null, false, null, null, null, 'streampark,test');
 
 -- ----------------------------
 -- records of t_flink_effective
diff --git a/streampark-console/streampark-console-service/src/assembly/script/schema/mysql-schema.sql b/streampark-console/streampark-console-service/src/assembly/script/schema/mysql-schema.sql
index 3e805f692..5e3ef9e79 100644
--- a/streampark-console/streampark-console-service/src/assembly/script/schema/mysql-schema.sql
+++ b/streampark-console/streampark-console-service/src/assembly/script/schema/mysql-schema.sql
@@ -77,6 +77,7 @@ create table `t_flink_app` (
   `app_type` tinyint default null,
   `duration` bigint default null,
   `job_id` varchar(64) collate utf8mb4_general_ci default null,
+  `job_manager_url` varchar(255) collate utf8mb4_general_ci default null,
   `version_id` bigint default null,
   `cluster_id` varchar(255) collate utf8mb4_general_ci default null,
   `k8s_namespace` varchar(255) collate utf8mb4_general_ci default null,
@@ -182,6 +183,7 @@ create table `t_flink_log` (
   `id` bigint not null auto_increment,
   `app_id` bigint default null,
   `yarn_app_id` varchar(50) collate utf8mb4_general_ci default null,
+  `job_manager_url` varchar(255) collate utf8mb4_general_ci default null,
   `success` tinyint default null,
   `exception` text collate utf8mb4_general_ci,
   `option_time` datetime default null,
diff --git a/streampark-console/streampark-console-service/src/assembly/script/schema/pgsql-schema.sql b/streampark-console/streampark-console-service/src/assembly/script/schema/pgsql-schema.sql
index b596b9e58..025960e00 100644
--- a/streampark-console/streampark-console-service/src/assembly/script/schema/pgsql-schema.sql
+++ b/streampark-console/streampark-console-service/src/assembly/script/schema/pgsql-schema.sql
@@ -214,6 +214,7 @@ create table "public"."t_flink_app" (
   "app_type" int2,
   "duration" int8,
   "job_id" varchar(64) collate "pg_catalog"."default",
+  "job_manager_url" varchar(255) collate "pg_catalog"."default",
   "version_id" int8,
   "cluster_id" varchar(255) collate "pg_catalog"."default",
   "k8s_namespace" varchar(255) collate "pg_catalog"."default",
@@ -407,6 +408,7 @@ create table "public"."t_flink_log" (
   "id" int8 not null default nextval('streampark_t_flink_log_id_seq'::regclass),
   "app_id" int8,
   "yarn_app_id" varchar(50) collate "pg_catalog"."default",
+  "job_manager_url" varchar(255) collate "pg_catalog"."default",
   "success" int2,
   "exception" text collate "pg_catalog"."default",
   "option_time" timestamp(6)
diff --git a/streampark-console/streampark-console-service/src/assembly/script/upgrade/mysql-upgrade.sql b/streampark-console/streampark-console-service/src/assembly/script/upgrade/mysql-upgrade.sql
index 18bcac658..59dcedbc8 100644
--- a/streampark-console/streampark-console-service/src/assembly/script/upgrade/mysql-upgrade.sql
+++ b/streampark-console/streampark-console-service/src/assembly/script/upgrade/mysql-upgrade.sql
@@ -128,6 +128,10 @@ alter table `t_flink_app` add column `default_mode_ingress` text collate utf8mb4
 alter table `t_flink_app` add column `modify_time` datetime not null default current_timestamp on update current_timestamp after create_time;
 -- add tags field
 alter table `t_flink_app` add column `tags` varchar(500) default null;
+-- add job_manager_url field
+alter table `t_flink_app` add column `job_manager_url` varchar(255) default null after `job_id`;
+-- add job_manager_url field
+alter table `t_flink_log` add column `job_manager_url` varchar(255) default null after `yarn_app_id`;
 
 alter table `t_flink_project`
 change column `date` `create_time` datetime default null,
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 7264e0363..88c78c051 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -99,6 +99,12 @@ public class Application implements Serializable {
     @TableField(updateStrategy = FieldStrategy.IGNORED)
     private String jobId;
 
+    /**
+     * The address of the jobmanager, that is, the direct access address of the Flink web UI
+     */
+    @TableField(updateStrategy = FieldStrategy.IGNORED)
+    private String jobManagerUrl;
+
     /**
      * flink version
      */
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationLog.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationLog.java
index f7ef677a8..22e0164b3 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationLog.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationLog.java
@@ -40,6 +40,10 @@ public class ApplicationLog {
      * applicationId
      */
     private String yarnAppId;
+    /**
+     * The address of the jobmanager, that is, the direct access address of the Flink web UI
+     */
+    private String jobManagerUrl;
     /**
      * start status
      */
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 4c8c10693..3dee66b95 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -1379,6 +1379,10 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
                 if (StringUtils.isNoneEmpty(submitResponse.jobId())) {
                     application.setJobId(submitResponse.jobId());
                 }
+                if (StringUtils.isNoneEmpty(submitResponse.jobManagerUrl())) {
+                    application.setJobManagerUrl(submitResponse.jobManagerUrl());
+                    applicationLog.setJobManagerUrl(submitResponse.jobManagerUrl());
+                }
                 application.setFlameGraph(appParam.getFlameGraph());
                 applicationLog.setYarnAppId(submitResponse.clusterId());
                 application.setStartTime(new Date());
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkTrackingTask.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkTrackingTask.java
index 84ad10a68..62f7c53d8 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkTrackingTask.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkTrackingTask.java
@@ -724,8 +724,14 @@ public class FlinkTrackingTask {
         if (appId != null) {
             if (application.getExecutionModeEnum().equals(ExecutionMode.YARN_APPLICATION) ||
                 application.getExecutionModeEnum().equals(ExecutionMode.YARN_PER_JOB)) {
-                String format = "proxy/%s/overview";
-                String reqURL = String.format(format, appId);
+                String reqURL;
+                if (StringUtils.isEmpty(application.getJobManagerUrl())) {
+                    String format = "proxy/%s/overview";
+                    reqURL = String.format(format, appId);
+                } else {
+                    String format = "%s/overview";
+                    reqURL = String.format(format, application.getJobManagerUrl());
+                }
                 return yarnRestRequest(reqURL, Overview.class);
                 // TODO: yarn-session
                 //String remoteUrl = getFlinkClusterRestUrl(flinkCluster, flinkUrl);
@@ -738,8 +744,14 @@ public class FlinkTrackingTask {
     private JobsOverview httpJobsOverview(Application application, FlinkCluster flinkCluster) throws Exception {
         final String flinkUrl = "jobs/overview";
         if (ExecutionMode.isYarnMode(application.getExecutionMode())) {
-            String format = "proxy/%s/" + flinkUrl;
-            String reqURL = String.format(format, application.getAppId());
+            String reqURL;
+            if (StringUtils.isEmpty(application.getJobManagerUrl())) {
+                String format = "proxy/%s/" + flinkUrl;
+                reqURL = String.format(format, application.getAppId());
+            } else {
+                String format = "%s/" + flinkUrl;
+                reqURL = String.format(format, application.getJobManagerUrl());
+            }
             JobsOverview jobsOverview = yarnRestRequest(reqURL, JobsOverview.class);
             if (jobsOverview != null && ExecutionMode.YARN_SESSION.equals(application.getExecutionModeEnum())) {
                 //过滤出当前job
@@ -765,8 +777,14 @@ public class FlinkTrackingTask {
     private CheckPoints httpCheckpoints(Application application, FlinkCluster flinkCluster) throws IOException {
         final String flinkUrl = "jobs/%s/checkpoints";
         if (ExecutionMode.isYarnMode(application.getExecutionMode())) {
-            String format = "proxy/%s/" + flinkUrl;
-            String reqURL = String.format(format, application.getAppId(), application.getJobId());
+            String reqURL;
+            if (StringUtils.isEmpty(application.getJobManagerUrl())) {
+                String format = "proxy/%s/" + flinkUrl;
+                reqURL = String.format(format, application.getAppId(), application.getJobId());
+            } else {
+                String format = "%s/" + flinkUrl;
+                reqURL = String.format(format, application.getJobManagerUrl(), application.getJobId());
+            }
             return yarnRestRequest(reqURL, CheckPoints.class);
         } else if (ExecutionMode.isRemoteMode(application.getExecutionMode())) {
             if (application.getJobId() != null) {
@@ -792,5 +810,4 @@ public class FlinkTrackingTask {
         }
         return JacksonUtils.read(result, clazz);
     }
-
 }
diff --git a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationLogMapper.xml b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationLogMapper.xml
index ab0a4edeb..b53b80e95 100644
--- a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationLogMapper.xml
+++ b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationLogMapper.xml
@@ -24,6 +24,7 @@
         <id column="id" jdbcType="BIGINT" property="id"/>
         <result column="job_id" jdbcType="BIGINT" property="jobId"/>
         <result column="yarn_app_id" jdbcType="VARCHAR" property="yarnAppId"/>
+        <result column="job_manager_url" jdbcType="VARCHAR" property="jobManagerUrl"/>
         <result column="success" jdbcType="INTEGER" property="success"/>
         <result column="exception" jdbcType="LONGVARCHAR" property="exception"/>
         <result column="option_time" jdbcType="DATE" property="optionTime"/>
diff --git a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
index b110fd6f2..b73ef2a5b 100644
--- a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
+++ b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
@@ -41,6 +41,7 @@
         <result column="jar_check_sum" jdbcType="VARCHAR" property="jarCheckSum"/>
         <result column="main_class" jdbcType="VARCHAR" property="mainClass"/>
         <result column="job_id" jdbcType="VARCHAR" property="jobId"/>
+        <result column="job_manager_url" jdbcType="VARCHAR" property="jobManagerUrl"/>
         <result column="user_id" jdbcType="BIGINT" property="userId"/>
         <result column="start_time" jdbcType="DATE" property="startTime"/>
         <result column="end_time" jdbcType="DATE" property="endTime"/>
diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/Detail.vue b/streampark-console/streampark-console-webapp/src/views/flink/app/Detail.vue
index 4a7a4d80b..fd8893aa3 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/app/Detail.vue
+++ b/streampark-console/streampark-console-webapp/src/views/flink/app/Detail.vue
@@ -485,6 +485,11 @@
                 slot-scope="text, record">
                 <span class="pointer" @click="handleView(record.yarnAppId)">{{ record.yarnAppId }}</span>
               </template>
+              <template
+                slot="jobManagerUrl"
+                slot-scope="text, record">
+                <span class="pointer" @click="handleJobManagerUrlView(record.jobManagerUrl)">{{ record.jobManagerUrl }}</span>
+              </template>
               <template
                 slot="optionTime"
                 slot-scope="text, record">
@@ -967,9 +972,15 @@ export default {
           {
             title: 'Application Id',
             dataIndex: 'yarnAppId',
-            width: '40%',
+            width: '20%',
             scopedSlots: {customRender: 'yarnAppId'}
           },
+          {
+            title: 'JobManager URL',
+            dataIndex: 'jobManagerUrl',
+            width: '25%',
+            scopedSlots: {customRender: 'jobManagerUrl'}
+          },
           {
             title: 'Start Status',
             dataIndex: 'success',
@@ -1159,6 +1170,10 @@ export default {
       }
     },
 
+    handleJobManagerUrlView() {
+      window.open(this.app.jobManagerUrl)
+    },
+
     handleSavePoint() {
       const params = {
         appId: this.app.id
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitResponse.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitResponse.scala
index 2ff47dca5..5ce981a63 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitResponse.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitResponse.scala
@@ -22,6 +22,7 @@ import javax.annotation.Nullable
 
 case class SubmitResponse(clusterId: String,
                           flinkConfig: JavaMap[String, String],
-                          @Nullable jobId: String = "") {
+                          @Nullable jobId: String = "",
+                          @Nullable jobManagerUrl: String = "") {
 
 }
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnApplicationSubmit.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnApplicationSubmit.scala
index 5ee29c76b..278c271c3 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnApplicationSubmit.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnApplicationSubmit.scala
@@ -121,8 +121,10 @@ object YarnApplicationSubmit extends YarnSubmitTrait {
 
           val applicationConfiguration = ApplicationConfiguration.fromConfiguration(flinkConfig)
           var applicationId: ApplicationId = null
+          var jobManagerUrl: String = null
           clusterClient = clusterDescriptor.deployApplicationCluster(clusterSpecification, applicationConfiguration).getClusterClient
           applicationId = clusterClient.getClusterId
+          jobManagerUrl = clusterClient.getWebInterfaceURL;
           logInfo(
             s"""
                |-------------------------<<applicationId>>------------------------
@@ -130,7 +132,7 @@ object YarnApplicationSubmit extends YarnSubmitTrait {
                |__________________________________________________________________
                |""".stripMargin)
 
-          SubmitResponse(applicationId.toString, flinkConfig.toMap)
+          SubmitResponse(applicationId.toString, flinkConfig.toMap, jobManagerUrl = jobManagerUrl);
         } finally {
           Utils.close(clusterDescriptor, clusterClient)
         }
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnPerJobSubmit.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnPerJobSubmit.scala
index 410e927e0..664954030 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnPerJobSubmit.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnPerJobSubmit.scala
@@ -101,6 +101,7 @@ object YarnPerJobSubmit extends YarnSubmitTrait {
 
       }
       val applicationId = clusterClient.getClusterId
+      val jobManagerUrl = clusterClient.getWebInterfaceURL
       logInfo(
         s"""
            |-------------------------<<applicationId>>------------------------
@@ -108,7 +109,7 @@ object YarnPerJobSubmit extends YarnSubmitTrait {
            |__________________________________________________________________
            |""".stripMargin)
 
-      SubmitResponse(applicationId.toString, flinkConfig.toMap)
+      SubmitResponse(applicationId.toString, flinkConfig.toMap, jobManagerUrl = jobManagerUrl)
     } finally {
       if (submitRequest.safePackageProgram) {
         Utils.close(packagedProgram)