You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by mo...@apache.org on 2022/11/04 06:32:24 UTC

[incubator-streampark] branch dev updated: [Improve] k8s mode job remapping Improvement (#1961)

This is an automated email from the ASF dual-hosted git repository.

monster pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3ad16a44c [Improve] k8s mode job remapping Improvement (#1961)
3ad16a44c is described below

commit 3ad16a44cf4c13825ded7576e040aeab80373d97
Author: benjobs <be...@apache.org>
AuthorDate: Fri Nov 4 14:32:17 2022 +0800

    [Improve] k8s mode job remapping Improvement (#1961)
---
 .../src/views/flink/app/hooks/useApp.tsx           | 24 ++++++++++------------
 .../console/core/mapper/ApplicationMapper.java     |  1 -
 .../core/service/impl/ApplicationServiceImpl.java  |  2 +-
 .../resources/mapper/core/ApplicationMapper.xml    | 17 +++++++++++++++
 4 files changed, 29 insertions(+), 15 deletions(-)

diff --git a/streampark-console/streampark-console-newui/src/views/flink/app/hooks/useApp.tsx b/streampark-console/streampark-console-newui/src/views/flink/app/hooks/useApp.tsx
index c79c633d0..7911d6ce8 100644
--- a/streampark-console/streampark-console-newui/src/views/flink/app/hooks/useApp.tsx
+++ b/streampark-console/streampark-console-newui/src/views/flink/app/hooks/useApp.tsx
@@ -283,23 +283,22 @@ export const useFlinkApplication = (openStartModal: Fn) => {
             name="mappingForm"
             labelCol={{ lg: { span: 7 }, sm: { span: 7 } }}
             wrapperCol={{ lg: { span: 16 }, sm: { span: 4 } }}
-            v-model:model={formValue}
-          >
+            v-model:model={formValue}>
             <Form.Item label="Application Name">
               <Alert message={app.jobName} type="info" />
             </Form.Item>
-            <Form.Item
-              label="Application Id"
-              name="appId"
-              rules={[{ required: true, message: 'ApplicationId is required' }]}
-            >
-              <Input type="text" placeholder="ApplicationId" v-model:value={formValue.appId} />
-            </Form.Item>
+            {[2, 3, 4].includes(app.executionMode) && (
+              <Form.Item
+                label="YARN Application Id"
+                name="appId"
+                rules={[{ required: true, message: 'YARN ApplicationId is required' }]}>
+                <Input type="text" placeholder="ApplicationId" v-model:value={formValue.appId} />
+              </Form.Item>
+            )}
             <Form.Item
               label="JobId"
               name="jobId"
-              rules={[{ required: true, message: 'ApplicationId is required' }]}
-            >
+              rules={[{ required: true, message: 'ApplicationId is required' }]}>
               <Input type="text" placeholder="JobId" v-model:value={formValue.jobId} />
             </Form.Item>
           </Form>
@@ -310,11 +309,10 @@ export const useFlinkApplication = (openStartModal: Fn) => {
       onOk: async () => {
         try {
           await mappingRef.value.validate();
-          console.log(formValue);
           await fetchMapping({
             id: app.id,
             appId: formValue.appId,
-            jobId: formValue.jobId,
+            jobId: formValue.jobId
           });
           Swal.fire({
             icon: 'success',
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
index 8c8985193..51a0788ae 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
@@ -40,7 +40,6 @@ public interface ApplicationMapper extends BaseMapper<Application> {
 
     List<Application> getByTeamId(@Param("teamId") Long teamId);
 
-    @Update("update t_flink_app set app_id=#{application.appId},job_id=#{application.jobId},state=14,end_time=null where id=#{application.id}")
     boolean mapping(@Param("application") Application appParam);
 
     @Update("update t_flink_app set option_state=0")
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 8efc9d4c6..1b2996966 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -1032,7 +1032,7 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
         boolean mapping = this.baseMapper.mapping(appParam);
         Application application = getById(appParam.getId());
         if (isKubernetesApp(application)) {
-            k8SFlinkTrackMonitor.unTrackingJob(toTrackId(application));
+            k8SFlinkTrackMonitor.trackingJob(toTrackId(application));
         } else {
             FlinkTrackingTask.addTracking(application);
         }
diff --git a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
index 0dc29da48..9d3cd3c15 100644
--- a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
+++ b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
@@ -31,6 +31,7 @@
         <result column="job_name" jdbcType="VARCHAR" property="jobName"/>
         <result column="app_id" jdbcType="VARCHAR" property="appId"/>
         <result column="version_id" jdbcType="BIGINT" property="versionId"/>
+        <result column="cluster_id" jdbcType="VARCHAR" property="clusterId"/>
         <result column="flink_image" jdbcType="VARCHAR" property="flinkImage"/>
         <result column="k8s_namespace" jdbcType="VARCHAR" property="k8sNamespace"/>
         <result column="app_type" jdbcType="INTEGER" property="appType"/>
@@ -199,4 +200,20 @@
         </set>
         where id=#{application.id}
     </update>
+
+    <update id="mapping" parameterType="application">
+        update t_flink_app
+        <set>
+            <if test="application.jobId != null">
+                job_id=#{application.jobId},
+            </if>
+            <if test="application.appId != null">
+                app_id=#{application.appId},
+            </if>
+            end_time=null,
+            `state`=14
+        </set>
+        where id=#{application.id}
+    </update>
+
 </mapper>