You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by be...@apache.org on 2022/09/14 13:16:21 UTC
[incubator-streampark] branch dev updated: [improve] In the on yarn mode, when there are too many jobs, the numb… (#1602)
This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new a7a3872e5 [improve] In the on yarn mode, when there are too many jobs, the numb… (#1602)
a7a3872e5 is described below
commit a7a3872e5e700ad3e3b9be259105caba6bf1f079
Author: macksonmu <30...@qq.com>
AuthorDate: Wed Sep 14 21:16:13 2022 +0800
[improve] In the on yarn mode, when there are too many jobs, the numb… (#1602)
* [improve] In the on yarn mode, when there are too many jobs, the number of ResourceManager connections is too high #1563
Co-authored-by: mucj7 <mu...@chinaunicom.cn>
---
.../apache/streampark/common/util/YarnUtils.scala | 1 +
.../src/assembly/script/data/mysql-data.sql | 2 +-
.../src/assembly/script/data/pgsql-data.sql | 2 +-
.../src/assembly/script/schema/mysql-schema.sql | 2 ++
.../src/assembly/script/schema/pgsql-schema.sql | 2 ++
.../src/assembly/script/upgrade/mysql-upgrade.sql | 4 +++
.../console/core/entity/Application.java | 6 +++++
.../console/core/entity/ApplicationLog.java | 4 +++
.../core/service/impl/ApplicationServiceImpl.java | 4 +++
.../console/core/task/FlinkTrackingTask.java | 31 +++++++++++++++++-----
.../resources/mapper/core/ApplicationLogMapper.xml | 1 +
.../resources/mapper/core/ApplicationMapper.xml | 1 +
.../src/views/flink/app/Detail.vue | 17 +++++++++++-
.../flink/submit/bean/SubmitResponse.scala | 3 ++-
.../flink/submit/impl/YarnApplicationSubmit.scala | 4 ++-
.../flink/submit/impl/YarnPerJobSubmit.scala | 3 ++-
16 files changed, 74 insertions(+), 13 deletions(-)
diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
index 5688d498d..60f1540b6 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
@@ -232,6 +232,7 @@ object YarnUtils extends Logger {
if (url == null) return null
def request(url: String): String = {
+ logDebug("request url is " + url);
val config = RequestConfig.custom.setConnectTimeout(5000).build
if (hasYarnHttpKerberosAuth) {
HadoopUtils.getUgi().doAs(new PrivilegedExceptionAction[String] {
diff --git a/streampark-console/streampark-console-service/src/assembly/script/data/mysql-data.sql b/streampark-console/streampark-console-service/src/assembly/script/data/mysql-data.sql
index 156f07aec..ae017f929 100644
--- a/streampark-console/streampark-console-service/src/assembly/script/data/mysql-data.sql
+++ b/streampark-console/streampark-console-service/src/assembly/script/data/mysql-data.sql
@@ -21,7 +21,7 @@ set foreign_key_checks = 0;
-- ----------------------------
-- Records of t_flink_app
-- ----------------------------
-insert into `t_flink_app` values (100000, 2, 4, null, null, 'Flink SQL Demo', null, null, null, null, null, null , null, 100000, null, 1, null, null, null, null, null, null, 0, 0, null, null, null, null, null, null, 'Flink SQL Demo', 0, null, 0, null, null, null, null, null, null, 0, 0, now(), now(), null, 1, 1, null, null, null, null, null, null, 0, null, null, null, 'streampark,test');
+insert into `t_flink_app` values (100000, 2, 4, null, null, 'Flink SQL Demo', null, null, null, null, null, null , null, 100000, null, 1, null, null, null, null, null, null, null, 0, 0, null, null, null, null, null, null, 'Flink SQL Demo', 0, null, 0, null, null, null, null, null, null, 0, 0, now(), now(), null, 1, 1, null, null, null, null, null, null, 0, null, null, null, 'streampark,test');
-- ----------------------------
-- Records of t_flink_effective
diff --git a/streampark-console/streampark-console-service/src/assembly/script/data/pgsql-data.sql b/streampark-console/streampark-console-service/src/assembly/script/data/pgsql-data.sql
index 83d1a3fb8..6848ae5c7 100644
--- a/streampark-console/streampark-console-service/src/assembly/script/data/pgsql-data.sql
+++ b/streampark-console/streampark-console-service/src/assembly/script/data/pgsql-data.sql
@@ -18,7 +18,7 @@
-- ----------------------------
-- records of t_flink_app
-- ----------------------------
-insert into "public"."t_flink_app" values (100000, 2, 4, null, null, 'Flink SQL Demo', null, null, null, null, null, null , null, 100000, null, 1, null, null, null, null, null, null, 0, 0, null, null, null, null, null, null, 'Flink SQL Demo', 0, null, false, null, null, null, null, null, null, 0, 0, now(), now(), null, 1, true, null, null, null, null, null, null, false, null, null, null, 'streampark,test');
+insert into "public"."t_flink_app" values (100000, 2, 4, null, null, 'Flink SQL Demo', null, null, null, null, null, null , null, 100000, null, 1, null, null, null, null, null, null, null, 0, 0, null, null, null, null, null, null, 'Flink SQL Demo', 0, null, false, null, null, null, null, null, null, 0, 0, now(), now(), null, 1, true, null, null, null, null, null, null, false, null, null, null, 'streampark,test');
-- ----------------------------
-- records of t_flink_effective
diff --git a/streampark-console/streampark-console-service/src/assembly/script/schema/mysql-schema.sql b/streampark-console/streampark-console-service/src/assembly/script/schema/mysql-schema.sql
index 3e805f692..5e3ef9e79 100644
--- a/streampark-console/streampark-console-service/src/assembly/script/schema/mysql-schema.sql
+++ b/streampark-console/streampark-console-service/src/assembly/script/schema/mysql-schema.sql
@@ -77,6 +77,7 @@ create table `t_flink_app` (
`app_type` tinyint default null,
`duration` bigint default null,
`job_id` varchar(64) collate utf8mb4_general_ci default null,
+ `job_manager_url` varchar(255) collate utf8mb4_general_ci default null,
`version_id` bigint default null,
`cluster_id` varchar(255) collate utf8mb4_general_ci default null,
`k8s_namespace` varchar(255) collate utf8mb4_general_ci default null,
@@ -182,6 +183,7 @@ create table `t_flink_log` (
`id` bigint not null auto_increment,
`app_id` bigint default null,
`yarn_app_id` varchar(50) collate utf8mb4_general_ci default null,
+ `job_manager_url` varchar(255) collate utf8mb4_general_ci default null,
`success` tinyint default null,
`exception` text collate utf8mb4_general_ci,
`option_time` datetime default null,
diff --git a/streampark-console/streampark-console-service/src/assembly/script/schema/pgsql-schema.sql b/streampark-console/streampark-console-service/src/assembly/script/schema/pgsql-schema.sql
index b596b9e58..025960e00 100644
--- a/streampark-console/streampark-console-service/src/assembly/script/schema/pgsql-schema.sql
+++ b/streampark-console/streampark-console-service/src/assembly/script/schema/pgsql-schema.sql
@@ -214,6 +214,7 @@ create table "public"."t_flink_app" (
"app_type" int2,
"duration" int8,
"job_id" varchar(64) collate "pg_catalog"."default",
+ "job_manager_url" varchar(255) collate "pg_catalog"."default",
"version_id" int8,
"cluster_id" varchar(255) collate "pg_catalog"."default",
"k8s_namespace" varchar(255) collate "pg_catalog"."default",
@@ -407,6 +408,7 @@ create table "public"."t_flink_log" (
"id" int8 not null default nextval('streampark_t_flink_log_id_seq'::regclass),
"app_id" int8,
"yarn_app_id" varchar(50) collate "pg_catalog"."default",
+ "job_manager_url" varchar(255) collate "pg_catalog"."default",
"success" int2,
"exception" text collate "pg_catalog"."default",
"option_time" timestamp(6)
diff --git a/streampark-console/streampark-console-service/src/assembly/script/upgrade/mysql-upgrade.sql b/streampark-console/streampark-console-service/src/assembly/script/upgrade/mysql-upgrade.sql
index 18bcac658..59dcedbc8 100644
--- a/streampark-console/streampark-console-service/src/assembly/script/upgrade/mysql-upgrade.sql
+++ b/streampark-console/streampark-console-service/src/assembly/script/upgrade/mysql-upgrade.sql
@@ -128,6 +128,10 @@ alter table `t_flink_app` add column `default_mode_ingress` text collate utf8mb4
alter table `t_flink_app` add column `modify_time` datetime not null default current_timestamp on update current_timestamp after create_time;
-- add tags field
alter table `t_flink_app` add column `tags` varchar(500) default null;
+-- add job_manager_url field
+alter table `t_flink_app` add column `job_manager_url` varchar(255) default null after `job_id`;
+-- add job_manager_url field
+alter table `t_flink_log` add column `job_manager_url` varchar(255) default null after `yarn_app_id`;
alter table `t_flink_project`
change column `date` `create_time` datetime default null,
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 7264e0363..88c78c051 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -99,6 +99,12 @@ public class Application implements Serializable {
@TableField(updateStrategy = FieldStrategy.IGNORED)
private String jobId;
+ /**
+ * The address of the jobmanager, that is, the direct access address of the Flink web UI
+ */
+ @TableField(updateStrategy = FieldStrategy.IGNORED)
+ private String jobManagerUrl;
+
/**
* flink version
*/
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationLog.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationLog.java
index f7ef677a8..22e0164b3 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationLog.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationLog.java
@@ -40,6 +40,10 @@ public class ApplicationLog {
* applicationId
*/
private String yarnAppId;
+ /**
+ * The address of the jobmanager, that is, the direct access address of the Flink web UI
+ */
+ private String jobManagerUrl;
/**
* start status
*/
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 4c8c10693..3dee66b95 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -1379,6 +1379,10 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
if (StringUtils.isNoneEmpty(submitResponse.jobId())) {
application.setJobId(submitResponse.jobId());
}
+ if (StringUtils.isNoneEmpty(submitResponse.jobManagerUrl())) {
+ application.setJobManagerUrl(submitResponse.jobManagerUrl());
+ applicationLog.setJobManagerUrl(submitResponse.jobManagerUrl());
+ }
application.setFlameGraph(appParam.getFlameGraph());
applicationLog.setYarnAppId(submitResponse.clusterId());
application.setStartTime(new Date());
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkTrackingTask.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkTrackingTask.java
index 84ad10a68..62f7c53d8 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkTrackingTask.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkTrackingTask.java
@@ -724,8 +724,14 @@ public class FlinkTrackingTask {
if (appId != null) {
if (application.getExecutionModeEnum().equals(ExecutionMode.YARN_APPLICATION) ||
application.getExecutionModeEnum().equals(ExecutionMode.YARN_PER_JOB)) {
- String format = "proxy/%s/overview";
- String reqURL = String.format(format, appId);
+ String reqURL;
+ if (StringUtils.isEmpty(application.getJobManagerUrl())) {
+ String format = "proxy/%s/overview";
+ reqURL = String.format(format, appId);
+ } else {
+ String format = "%s/overview";
+ reqURL = String.format(format, application.getJobManagerUrl());
+ }
return yarnRestRequest(reqURL, Overview.class);
// TODO: yarn-session
//String remoteUrl = getFlinkClusterRestUrl(flinkCluster, flinkUrl);
@@ -738,8 +744,14 @@ public class FlinkTrackingTask {
private JobsOverview httpJobsOverview(Application application, FlinkCluster flinkCluster) throws Exception {
final String flinkUrl = "jobs/overview";
if (ExecutionMode.isYarnMode(application.getExecutionMode())) {
- String format = "proxy/%s/" + flinkUrl;
- String reqURL = String.format(format, application.getAppId());
+ String reqURL;
+ if (StringUtils.isEmpty(application.getJobManagerUrl())) {
+ String format = "proxy/%s/" + flinkUrl;
+ reqURL = String.format(format, application.getAppId());
+ } else {
+ String format = "%s/" + flinkUrl;
+ reqURL = String.format(format, application.getJobManagerUrl());
+ }
JobsOverview jobsOverview = yarnRestRequest(reqURL, JobsOverview.class);
if (jobsOverview != null && ExecutionMode.YARN_SESSION.equals(application.getExecutionModeEnum())) {
//过滤出当前job
@@ -765,8 +777,14 @@ public class FlinkTrackingTask {
private CheckPoints httpCheckpoints(Application application, FlinkCluster flinkCluster) throws IOException {
final String flinkUrl = "jobs/%s/checkpoints";
if (ExecutionMode.isYarnMode(application.getExecutionMode())) {
- String format = "proxy/%s/" + flinkUrl;
- String reqURL = String.format(format, application.getAppId(), application.getJobId());
+ String reqURL;
+ if (StringUtils.isEmpty(application.getJobManagerUrl())) {
+ String format = "proxy/%s/" + flinkUrl;
+ reqURL = String.format(format, application.getAppId(), application.getJobId());
+ } else {
+ String format = "%s/" + flinkUrl;
+ reqURL = String.format(format, application.getJobManagerUrl(), application.getJobId());
+ }
return yarnRestRequest(reqURL, CheckPoints.class);
} else if (ExecutionMode.isRemoteMode(application.getExecutionMode())) {
if (application.getJobId() != null) {
@@ -792,5 +810,4 @@ public class FlinkTrackingTask {
}
return JacksonUtils.read(result, clazz);
}
-
}
diff --git a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationLogMapper.xml b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationLogMapper.xml
index ab0a4edeb..b53b80e95 100644
--- a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationLogMapper.xml
+++ b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationLogMapper.xml
@@ -24,6 +24,7 @@
<id column="id" jdbcType="BIGINT" property="id"/>
<result column="job_id" jdbcType="BIGINT" property="jobId"/>
<result column="yarn_app_id" jdbcType="VARCHAR" property="yarnAppId"/>
+ <result column="job_manager_url" jdbcType="VARCHAR" property="jobManagerUrl"/>
<result column="success" jdbcType="INTEGER" property="success"/>
<result column="exception" jdbcType="LONGVARCHAR" property="exception"/>
<result column="option_time" jdbcType="DATE" property="optionTime"/>
diff --git a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
index b110fd6f2..b73ef2a5b 100644
--- a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
+++ b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
@@ -41,6 +41,7 @@
<result column="jar_check_sum" jdbcType="VARCHAR" property="jarCheckSum"/>
<result column="main_class" jdbcType="VARCHAR" property="mainClass"/>
<result column="job_id" jdbcType="VARCHAR" property="jobId"/>
+ <result column="job_manager_url" jdbcType="VARCHAR" property="jobManagerUrl"/>
<result column="user_id" jdbcType="BIGINT" property="userId"/>
<result column="start_time" jdbcType="DATE" property="startTime"/>
<result column="end_time" jdbcType="DATE" property="endTime"/>
diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/Detail.vue b/streampark-console/streampark-console-webapp/src/views/flink/app/Detail.vue
index 4a7a4d80b..fd8893aa3 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/app/Detail.vue
+++ b/streampark-console/streampark-console-webapp/src/views/flink/app/Detail.vue
@@ -485,6 +485,11 @@
slot-scope="text, record">
<span class="pointer" @click="handleView(record.yarnAppId)">{{ record.yarnAppId }}</span>
</template>
+ <template
+ slot="jobManagerUrl"
+ slot-scope="text, record">
+ <span class="pointer" @click="handleJobManagerUrlView(record.jobManagerUrl)">{{ record.jobManagerUrl }}</span>
+ </template>
<template
slot="optionTime"
slot-scope="text, record">
@@ -967,9 +972,15 @@ export default {
{
title: 'Application Id',
dataIndex: 'yarnAppId',
- width: '40%',
+ width: '20%',
scopedSlots: {customRender: 'yarnAppId'}
},
+ {
+ title: 'JobManager URL',
+ dataIndex: 'jobManagerUrl',
+ width: '25%',
+ scopedSlots: {customRender: 'jobManagerUrl'}
+ },
{
title: 'Start Status',
dataIndex: 'success',
@@ -1159,6 +1170,10 @@ export default {
}
},
+ handleJobManagerUrlView() {
+ window.open(this.app.jobManagerUrl)
+ },
+
handleSavePoint() {
const params = {
appId: this.app.id
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitResponse.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitResponse.scala
index 2ff47dca5..5ce981a63 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitResponse.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitResponse.scala
@@ -22,6 +22,7 @@ import javax.annotation.Nullable
case class SubmitResponse(clusterId: String,
flinkConfig: JavaMap[String, String],
- @Nullable jobId: String = "") {
+ @Nullable jobId: String = "",
+ @Nullable jobManagerUrl: String = "") {
}
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnApplicationSubmit.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnApplicationSubmit.scala
index 5ee29c76b..278c271c3 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnApplicationSubmit.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnApplicationSubmit.scala
@@ -121,8 +121,10 @@ object YarnApplicationSubmit extends YarnSubmitTrait {
val applicationConfiguration = ApplicationConfiguration.fromConfiguration(flinkConfig)
var applicationId: ApplicationId = null
+ var jobManagerUrl: String = null
clusterClient = clusterDescriptor.deployApplicationCluster(clusterSpecification, applicationConfiguration).getClusterClient
applicationId = clusterClient.getClusterId
+ jobManagerUrl = clusterClient.getWebInterfaceURL;
logInfo(
s"""
|-------------------------<<applicationId>>------------------------
@@ -130,7 +132,7 @@ object YarnApplicationSubmit extends YarnSubmitTrait {
|__________________________________________________________________
|""".stripMargin)
- SubmitResponse(applicationId.toString, flinkConfig.toMap)
+ SubmitResponse(applicationId.toString, flinkConfig.toMap, jobManagerUrl = jobManagerUrl);
} finally {
Utils.close(clusterDescriptor, clusterClient)
}
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnPerJobSubmit.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnPerJobSubmit.scala
index 410e927e0..664954030 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnPerJobSubmit.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/YarnPerJobSubmit.scala
@@ -101,6 +101,7 @@ object YarnPerJobSubmit extends YarnSubmitTrait {
}
val applicationId = clusterClient.getClusterId
+ val jobManagerUrl = clusterClient.getWebInterfaceURL
logInfo(
s"""
|-------------------------<<applicationId>>------------------------
@@ -108,7 +109,7 @@ object YarnPerJobSubmit extends YarnSubmitTrait {
|__________________________________________________________________
|""".stripMargin)
- SubmitResponse(applicationId.toString, flinkConfig.toMap)
+ SubmitResponse(applicationId.toString, flinkConfig.toMap, jobManagerUrl = jobManagerUrl)
} finally {
if (submitRequest.safePackageProgram) {
Utils.close(packagedProgram)