You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by mo...@apache.org on 2022/11/01 12:21:45 UTC

[incubator-streampark] branch mapping updated: [bug] Resolve k8s mode mapping is not available

This is an automated email from the ASF dual-hosted git repository.

monster pushed a commit to branch mapping
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/mapping by this push:
     new 998314585 [bug] Resolve k8s mode mapping is not available
998314585 is described below

commit 998314585d1819e59bbfca460f5335b603589116
Author: Monster <25...@qq.com>
AuthorDate: Tue Nov 1 19:28:04 2022 +0800

    [bug] Resolve k8s mode mapping is not available
---
 .../org/apache/streampark/console/core/mapper/ApplicationMapper.java | 5 ++++-
 .../streampark/console/core/service/impl/ApplicationServiceImpl.java | 5 +++--
 2 files changed, 7 insertions(+), 3 deletions(-)

diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
index 8c8985193..bce872bb4 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
@@ -40,7 +40,10 @@ public interface ApplicationMapper extends BaseMapper<Application> {
 
     List<Application> getByTeamId(@Param("teamId") Long teamId);
 
-    @Update("update t_flink_app set app_id=#{application.appId},job_id=#{application.jobId},state=14,end_time=null where id=#{application.id}")
+    @Update("update t_flink_app set job_id=#{application.jobId},state=5,end_time=null where id=#{application.id}")
+    boolean historyJobMapping(@Param("application") Application appParam);
+
+    @Update("update t_flink_app set state=5,end_time=null where id=#{application.id}")
     boolean mapping(@Param("application") Application appParam);
 
     @Update("update t_flink_app set option_state=0")
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 8efc9d4c6..729e2bb10 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -130,6 +130,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -1029,10 +1030,10 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
     @Override
     @RefreshCache
     public boolean mapping(Application appParam) {
-        boolean mapping = this.baseMapper.mapping(appParam);
+        boolean mapping = Optional.ofNullable(appParam.getJobId()).isPresent() ? this.baseMapper.historyJobMapping(appParam) : this.baseMapper.mapping(appParam);
         Application application = getById(appParam.getId());
         if (isKubernetesApp(application)) {
-            k8SFlinkTrackMonitor.unTrackingJob(toTrackId(application));
+            k8SFlinkTrackMonitor.trackingJob(toTrackId(application));
         } else {
             FlinkTrackingTask.addTracking(application);
         }