You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by be...@apache.org on 2022/11/10 14:09:05 UTC
[incubator-streampark] branch dev updated: [Improve] Unified Mapper (#1951)
This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 73b4bcc4d [Improve] Unified Mapper (#1951)
73b4bcc4d is described below
commit 73b4bcc4dd1fa485ff18bcd5ad79f5d733246fbe
Author: ChunFu Wu <31...@qq.com>
AuthorDate: Thu Nov 10 22:08:59 2022 +0800
[Improve] Unified Mapper (#1951)
* [Improve] Unified Mapper
---
.../core/mapper/ApplicationConfigMapper.java | 11 --
.../console/core/mapper/ApplicationLogMapper.java | 11 --
.../console/core/mapper/ApplicationMapper.java | 36 +------
.../console/core/mapper/EffectiveMapper.java | 8 --
.../console/core/mapper/FlameGraphMapper.java | 11 --
.../console/core/mapper/FlinkClusterMapper.java | 7 --
.../console/core/mapper/FlinkEnvMapper.java | 6 +-
.../console/core/mapper/FlinkSqlMapper.java | 4 +-
.../console/core/mapper/ProjectMapper.java | 14 +--
.../console/core/mapper/SavePointMapper.java | 25 -----
.../console/core/mapper/SettingMapper.java | 8 --
.../console/core/mapper/TutorialMapper.java | 4 -
.../console/core/mapper/VariableMapper.java | 1 +
.../core/service/impl/AppBuildPipeServiceImpl.java | 13 +--
.../service/impl/ApplicationBackUpServiceImpl.java | 6 +-
.../service/impl/ApplicationConfigServiceImpl.java | 20 ++--
.../service/impl/ApplicationLogServiceImpl.java | 12 ++-
.../core/service/impl/ApplicationServiceImpl.java | 47 +++++----
.../core/service/impl/EffectiveServiceImpl.java | 13 ++-
.../core/service/impl/FlameGraphServiceImpl.java | 42 ++++----
.../core/service/impl/FlinkClusterServiceImpl.java | 18 ++--
.../core/service/impl/FlinkEnvServiceImpl.java | 6 +-
.../core/service/impl/FlinkSqlServiceImpl.java | 16 +--
.../core/service/impl/LoggerServiceImpl.java | 1 +
.../core/service/impl/MessageServiceImpl.java | 10 +-
.../core/service/impl/ProjectServiceImpl.java | 29 ++++--
.../core/service/impl/SavePointServiceImpl.java | 36 +++++--
.../core/service/impl/SettingServiceImpl.java | 12 ++-
.../core/service/impl/TutorialServiceImpl.java | 9 +-
.../core/service/impl/VariableServiceImpl.java | 23 ++--
.../console/core/task/ProjectBuildTask.java | 8 +-
.../system/service/impl/MemberServiceImpl.java | 19 ++--
.../system/service/impl/MenuServiceImpl.java | 5 +-
.../system/service/impl/RoleMenuServiceImpl.java | 11 +-
.../system/service/impl/RoleServiceImpl.java | 6 +-
.../system/service/impl/TeamServiceImpl.java | 4 +-
.../system/service/impl/UserServiceImpl.java | 22 ++--
.../resources/mapper/core/AlertConfigMapper.xml | 2 +-
.../mapper/core/ApplicationConfigMapper.xml | 15 +++
.../resources/mapper/core/ApplicationMapper.xml | 116 ++++++++++++++++++---
.../main/resources/mapper/core/FlinkEnvMapper.xml | 16 +++
.../main/resources/mapper/core/FlinkSqlMapper.xml | 17 ++-
.../main/resources/mapper/core/ProjectMapper.xml | 28 ++---
.../main/resources/mapper/core/VariableMapper.xml | 4 +-
44 files changed, 427 insertions(+), 305 deletions(-)
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationConfigMapper.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationConfigMapper.java
index 865f36ae1..80459224e 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationConfigMapper.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationConfigMapper.java
@@ -20,23 +20,12 @@ package org.apache.streampark.console.core.mapper;
import org.apache.streampark.console.core.entity.ApplicationConfig;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
-import com.baomidou.mybatisplus.core.metadata.IPage;
-import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.ibatis.annotations.Param;
-import org.apache.ibatis.annotations.Select;
public interface ApplicationConfigMapper extends BaseMapper<ApplicationConfig> {
- @Select("select max(`version`) as lastVersion from t_flink_config where app_id=#{appId}")
Integer getLastVersion(@Param("appId") Long appId);
- @Select("select * from t_flink_config where app_id=#{appId}")
- IPage<ApplicationConfig> page(Page<ApplicationConfig> page, @Param("appId") Long appId);
-
- @Select("select s.* from t_flink_config s inner join t_flink_effective e on s.id = e.target_id where e.app_id=#{appId} and e.target_type=1")
ApplicationConfig getEffective(@Param("appId") Long appId);
- @Select("select * from t_flink_config where app_id=#{appId} and latest=true")
- ApplicationConfig getLatest(@Param("appId") Long appId);
-
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationLogMapper.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationLogMapper.java
index d89ff3e24..adc442b86 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationLogMapper.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationLogMapper.java
@@ -17,21 +17,10 @@
package org.apache.streampark.console.core.mapper;
-import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.entity.ApplicationLog;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
-import com.baomidou.mybatisplus.core.metadata.IPage;
-import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
-import org.apache.ibatis.annotations.Delete;
-import org.apache.ibatis.annotations.Param;
-import org.apache.ibatis.annotations.Select;
public interface ApplicationLogMapper extends BaseMapper<ApplicationLog> {
- @Select("SELECT * from t_flink_log where app_id=#{appId}")
- IPage<ApplicationLog> page(Page<Application> page, @Param("appId") Long appId);
-
- @Delete("delete from t_flink_log where app_id=#{appId}")
- void removeApp(@Param("appId") Long appId);
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
index 51a0788ae..61bddd771 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
@@ -23,65 +23,31 @@ import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.ibatis.annotations.Param;
-import org.apache.ibatis.annotations.Select;
-import org.apache.ibatis.annotations.Update;
import java.util.List;
public interface ApplicationMapper extends BaseMapper<Application> {
+
IPage<Application> page(Page<Application> page, @Param("application") Application application);
Application getApp(@Param("application") Application application);
void updateTracking(@Param("application") Application application);
- @Select("select * from t_flink_app where project_id=#{projectId}")
- List<Application> getByProjectId(@Param("projectId") Long projectId);
-
List<Application> getByTeamId(@Param("teamId") Long teamId);
boolean mapping(@Param("application") Application appParam);
- @Update("update t_flink_app set option_state=0")
- void resetOptionState();
-
- @Select("select k8s_namespace from " +
- "(select k8s_namespace, max(create_time) as ct from t_flink_app " +
- "where k8s_namespace is not null group by k8s_namespace order by ct desc) as ns " +
- "limit #{limitSize}")
List<String> getRecentK8sNamespace(@Param("limitSize") int limit);
- @Select("select cluster_id from " +
- "(select cluster_id, max(create_time) as ct from t_flink_app " +
- "where cluster_id is not null and execution_mode = #{executionMode} group by cluster_id order by ct desc) as ci " +
- "limit #{limitSize}")
List<String> getRecentK8sClusterId(@Param("executionMode") int executionMode, @Param("limitSize") int limit);
- @Select("select flink_image from " +
- "(select flink_image, max(create_time) as ct from t_flink_app " +
- "where flink_image is not null and execution_mode = 6 group by flink_image order by ct desc) as fi " +
- "limit #{limitSize}")
List<String> getRecentFlinkBaseImage(@Param("limitSize") int limit);
- @Select("select k8s_pod_template from " +
- "(select k8s_pod_template, max(create_time) as ct from t_flink_app " +
- "where k8s_pod_template is not null and k8s_pod_template <> '' and execution_mode = 6 " +
- "group by k8s_pod_template order by ct desc) as pt " +
- "limit #{limitSize}")
List<String> getRecentK8sPodTemplate(@Param("limitSize") int limit);
- @Select("select k8s_jm_pod_template from " +
- "(select k8s_jm_pod_template, max(create_time) as ct from t_flink_app " +
- "where k8s_jm_pod_template is not null and k8s_jm_pod_template <> '' and execution_mode = 6 " +
- "group by k8s_jm_pod_template order by ct desc) as pt " +
- "limit #{limitSize}")
List<String> getRecentK8sJmPodTemplate(@Param("limitSize") int limit);
- @Select("select k8s_tm_pod_template from " +
- "(select k8s_tm_pod_template, max(create_time) as ct from t_flink_app " +
- "where k8s_tm_pod_template is not null and k8s_tm_pod_template <> '' and execution_mode = 6 " +
- "group by k8s_tm_pod_template order by ct desc) as pt " +
- "limit #{limitSize}")
List<String> getRecentK8sTmPodTemplate(@Param("limitSize") int limit);
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/EffectiveMapper.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/EffectiveMapper.java
index 28389c611..ba9a6db19 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/EffectiveMapper.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/EffectiveMapper.java
@@ -20,15 +20,7 @@ package org.apache.streampark.console.core.mapper;
import org.apache.streampark.console.core.entity.Effective;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
-import org.apache.ibatis.annotations.Delete;
-import org.apache.ibatis.annotations.Param;
-import org.apache.ibatis.annotations.Select;
public interface EffectiveMapper extends BaseMapper<Effective> {
- @Select("select * from t_flink_effective where app_id=#{appId} and target_type=#{type}")
- Effective get(@Param("appId") Long appId, @Param("type") int type);
-
- @Delete("delete from t_flink_effective where app_id=#{appId}")
- void removeApp(@Param("appId") Long appId);
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlameGraphMapper.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlameGraphMapper.java
index 7ed6a7763..52ebcd442 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlameGraphMapper.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlameGraphMapper.java
@@ -20,18 +20,7 @@ package org.apache.streampark.console.core.mapper;
import org.apache.streampark.console.core.entity.FlameGraph;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
-import org.apache.ibatis.annotations.Delete;
-import org.apache.ibatis.annotations.Param;
-import org.apache.ibatis.annotations.Select;
-
-import java.util.Date;
-import java.util.List;
public interface FlameGraphMapper extends BaseMapper<FlameGraph> {
- @Select("select * from t_flame_graph where app_id=#{appId} and timeline between #{start} and #{end} order by timeline asc")
- List<FlameGraph> getFlameGraph(@Param("appId") Long appId, @Param("start") Date start, @Param("end") Date end);
-
- @Delete("delete from t_flame_graph where timeline < #{end}")
- void clean(Date end);
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlinkClusterMapper.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlinkClusterMapper.java
index a1afb77eb..b2cb088bd 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlinkClusterMapper.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlinkClusterMapper.java
@@ -20,14 +20,7 @@ package org.apache.streampark.console.core.mapper;
import org.apache.streampark.console.core.entity.FlinkCluster;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
-import org.apache.ibatis.annotations.Param;
-import org.apache.ibatis.annotations.Select;
public interface FlinkClusterMapper extends BaseMapper<FlinkCluster> {
- @Select("SELECT * from t_flink_cluster where cluster_name=#{clusterName}")
- FlinkCluster getByName(@Param("clusterName") String clusterName);
-
- @Select("SELECT * from t_flink_cluster where cluster_id=#{clusterId}")
- FlinkCluster getByClusterId(@Param("clusterId") String clusterId);
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlinkEnvMapper.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlinkEnvMapper.java
index 2a4c3dcd7..4a3b0b456 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlinkEnvMapper.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlinkEnvMapper.java
@@ -21,14 +21,10 @@ import org.apache.streampark.console.core.entity.FlinkEnv;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Param;
-import org.apache.ibatis.annotations.Select;
-import org.apache.ibatis.annotations.Update;
public interface FlinkEnvMapper extends BaseMapper<FlinkEnv> {
- @Select("select v.* from t_flink_env v inner join (select version_id from t_flink_app where id=#{appId}) as t on v.id = t.version_id")
FlinkEnv getByAppId(@Param("appId") Long appId);
- @Update("update t_flink_env set is_default = case id when #{id} then true else false end")
- void setDefault(Long id);
+ void setDefault(@Param("id") Long id);
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlinkSqlMapper.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlinkSqlMapper.java
index 450c6c383..cc1ee3ec9 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlinkSqlMapper.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlinkSqlMapper.java
@@ -21,15 +21,13 @@ import org.apache.streampark.console.core.entity.FlinkSql;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Param;
-import org.apache.ibatis.annotations.Select;
import java.util.List;
public interface FlinkSqlMapper extends BaseMapper<FlinkSql> {
- @Select("select s.* from t_flink_sql s inner join t_flink_effective e on s.id = e.target_id where e.target_type=2 and e.app_id=#{appId}")
+
FlinkSql getEffective(@Param("appId") Long appId);
- @Select("select max(`version`) as maxVersion from t_flink_sql where app_id=#{appId}")
Integer getLatestVersion(@Param("appId") Long appId);
List<FlinkSql> getByTeamId(@Param("teamId") Long teamId);
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ProjectMapper.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ProjectMapper.java
index c11b4de96..c5ce5db8f 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ProjectMapper.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ProjectMapper.java
@@ -20,22 +20,14 @@ package org.apache.streampark.console.core.mapper;
import org.apache.streampark.console.core.entity.Project;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
-import com.baomidou.mybatisplus.core.metadata.IPage;
-import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.ibatis.annotations.Param;
-import org.apache.ibatis.annotations.Update;
public interface ProjectMapper extends BaseMapper<Project> {
- IPage<Project> page(Page<Project> page, @Param("project") Project project);
+ void updateFailureBuildById(@Param("project") Project project);
- @Update("update t_flink_project set BUILD_STATE=2 where id=#{project.id}")
- void failureBuild(@Param("project") Project project);
+ void updateSuccessBuildById(@Param("project") Project project);
- @Update("update t_flink_project set LAST_BUILD = now(),BUILD_STATE=1 where id=#{project.id}")
- void successBuild(@Param("project") Project project);
-
- @Update("update t_flink_project set BUILD_STATE=0 where id=#{project.id}")
- void startBuild(@Param("project") Project project);
+ void updateStartBuildById(@Param("project") Project project);
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SavePointMapper.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SavePointMapper.java
index 0fea26a43..2b67d02cc 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SavePointMapper.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SavePointMapper.java
@@ -20,32 +20,7 @@ package org.apache.streampark.console.core.mapper;
import org.apache.streampark.console.core.entity.SavePoint;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
-import com.baomidou.mybatisplus.core.metadata.IPage;
-import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
-import org.apache.ibatis.annotations.Delete;
-import org.apache.ibatis.annotations.Param;
-import org.apache.ibatis.annotations.Select;
-import org.apache.ibatis.annotations.Update;
-
-import java.util.Date;
public interface SavePointMapper extends BaseMapper<SavePoint> {
- @Update("update t_flink_savepoint set latest=false where app_id=#{appId}")
- void obsolete(@Param("appId") Long appId);
-
- @Select("select * from t_flink_savepoint where app_id=#{appId} and latest=true")
- SavePoint getLatest(@Param("appId") Long appId);
-
- @Select("select * from t_flink_savepoint where app_id=#{appId}")
- IPage<SavePoint> page(Page<SavePoint> page, @Param("appId") Long appId);
-
- @Delete("delete from t_flink_savepoint where app_id=#{appId}")
- void removeApp(@Param("appId") Long appId);
-
- @Delete("delete from t_flink_savepoint where app_id=#{appId} and type = 1 and trigger_time < #{trigger}")
- void expire(@Param("appId") Long appId, @Param("trigger") Date triggerTime);
-
- @Delete("delete from t_flink_savepoint where app_id=#{appId} and type = 1")
- void expireAll(@Param("appId") Long appId);
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SettingMapper.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SettingMapper.java
index d28d3b96c..d3d58b805 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SettingMapper.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SettingMapper.java
@@ -20,15 +20,7 @@ package org.apache.streampark.console.core.mapper;
import org.apache.streampark.console.core.entity.Setting;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
-import org.apache.ibatis.annotations.Param;
-import org.apache.ibatis.annotations.Select;
-import org.apache.ibatis.annotations.Update;
public interface SettingMapper extends BaseMapper<Setting> {
- @Select("select * from t_setting where setting_key=#{key}")
- Setting get(@Param("key") String key);
-
- @Update("update t_setting set setting_value = #{setting.settingValue} where setting_key = #{setting.settingKey}")
- void updateByKey(@Param("setting") Setting setting);
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/TutorialMapper.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/TutorialMapper.java
index 52692a8ab..e33520e9a 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/TutorialMapper.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/TutorialMapper.java
@@ -20,11 +20,7 @@ package org.apache.streampark.console.core.mapper;
import org.apache.streampark.console.core.entity.Tutorial;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
-import org.apache.ibatis.annotations.Param;
-import org.apache.ibatis.annotations.Select;
public interface TutorialMapper extends BaseMapper<Tutorial> {
- @Select("select * from t_flink_tutorial where name=#{name}")
- Tutorial getByName(@Param("name") String name);
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/VariableMapper.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/VariableMapper.java
index fd60c541b..db3b84917 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/VariableMapper.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/VariableMapper.java
@@ -27,6 +27,7 @@ import org.apache.ibatis.annotations.Param;
import java.util.List;
public interface VariableMapper extends BaseMapper<Variable> {
+
IPage<Variable> page(Page<Variable> page, @Param("variable") Variable variable);
List<Variable> selectByTeamId(@Param("teamId") Long teamId, @Param("keyword") String keyword);
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index 10cb85b7d..866b4c40b 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -70,7 +70,6 @@ import org.apache.streampark.flink.packer.pipeline.impl.FlinkRemoteBuildPipeline
import org.apache.streampark.flink.packer.pipeline.impl.FlinkYarnApplicationBuildPipeline;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
-import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
@@ -234,7 +233,7 @@ public class AppBuildPipeServiceImpl
} else {
app.setOptionState(OptionState.NONE.getValue());
app.setLaunch(LaunchState.DONE.get());
- //如果当前任务未运行,或者刚刚新增的任务,则直接将候选版本的设置为正式版本
+ // If the current task is not running, or the task has just been added, directly set the candidate version to the official version
if (app.isFlinkSqlJob()) {
applicationService.toEffective(app);
} else {
@@ -387,7 +386,8 @@ public class AppBuildPipeServiceImpl
log.info("Submit params to building pipeline : {}", k8sApplicationBuildRequest);
return FlinkK8sApplicationBuildPipeline.of(k8sApplicationBuildRequest);
default:
- throw new UnsupportedOperationException("Unsupported Building Application for ExecutionMode: " + app.getExecutionModeEnum());
+ throw new UnsupportedOperationException(
+ "Unsupported Building Application for ExecutionMode: " + app.getExecutionModeEnum());
}
}
@@ -444,9 +444,10 @@ public class AppBuildPipeServiceImpl
if (CollectionUtils.isEmpty(appIds)) {
return Maps.newHashMap();
}
- QueryWrapper<AppBuildPipeline> query = new QueryWrapper<>();
- query.select("app_id", "pipe_status").in("app_id", appIds);
- List<Map<String, Object>> rMaps = baseMapper.selectMaps(query);
+ LambdaQueryWrapper<AppBuildPipeline> queryWrapper = new LambdaQueryWrapper<AppBuildPipeline>()
+ .select(AppBuildPipeline::getAppId, AppBuildPipeline::getPipeStatus)
+ .in(AppBuildPipeline::getAppId, appIds);
+ List<Map<String, Object>> rMaps = baseMapper.selectMaps(queryWrapper);
if (CollectionUtils.isEmpty(rMaps)) {
return Maps.newHashMap();
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationBackUpServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationBackUpServiceImpl.java
index 79bafaa88..f106adb8c 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationBackUpServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationBackUpServiceImpl.java
@@ -85,7 +85,7 @@ public class ApplicationBackUpServiceImpl
@Override
public IPage<ApplicationBackUp> page(ApplicationBackUp backUp, RestRequest request) {
Page<ApplicationBackUp> page = new MybatisPager<ApplicationBackUp>().getDefaultPage(request);
- LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<ApplicationBackUp>()
+ LambdaQueryWrapper<ApplicationBackUp> queryWrapper = new LambdaQueryWrapper<ApplicationBackUp>()
.eq(ApplicationBackUp::getAppId, backUp.getAppId());
return this.baseMapper.selectPage(page, queryWrapper);
}
@@ -208,8 +208,8 @@ public class ApplicationBackUpServiceImpl
@Override
public boolean isFlinkSqlBacked(Long appId, Long sqlId) {
- LambdaQueryWrapper<ApplicationBackUp> queryWrapper = new LambdaQueryWrapper();
- queryWrapper.eq(ApplicationBackUp::getAppId, appId)
+ LambdaQueryWrapper<ApplicationBackUp> queryWrapper = new LambdaQueryWrapper<ApplicationBackUp>()
+ .eq(ApplicationBackUp::getAppId, appId)
.eq(ApplicationBackUp::getSqlId, sqlId);
return baseMapper.selectCount(queryWrapper) > 0;
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationConfigServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationConfigServiceImpl.java
index dd1332aaf..9b1001456 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationConfigServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationConfigServiceImpl.java
@@ -33,6 +33,7 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@@ -179,7 +180,10 @@ public class ApplicationConfigServiceImpl
@Override
public ApplicationConfig getLatest(Long appId) {
- return baseMapper.getLatest(appId);
+ LambdaQueryWrapper<ApplicationConfig> queryWrapper = new LambdaQueryWrapper<ApplicationConfig>()
+ .eq(ApplicationConfig::getAppId, appId)
+ .eq(ApplicationConfig::getLatest, true);
+ return this.getOne(queryWrapper);
}
@Override
@@ -201,19 +205,19 @@ public class ApplicationConfigServiceImpl
@Override
public IPage<ApplicationConfig> page(ApplicationConfig config, RestRequest request) {
- return this.baseMapper.page(
- new MybatisPager<ApplicationConfig>().getPage(request, "version", Constant.ORDER_DESC),
- config.getAppId()
- );
+ Page<ApplicationConfig> page = new MybatisPager<ApplicationConfig>().getPage(request, "version", Constant.ORDER_DESC);
+ LambdaQueryWrapper<ApplicationConfig> queryWrapper = new LambdaQueryWrapper<ApplicationConfig>()
+ .eq(ApplicationConfig::getAppId, config.getAppId());
+ return this.page(page, queryWrapper);
}
@Override
public List<ApplicationConfig> history(Application application) {
- LambdaQueryWrapper<ApplicationConfig> wrapper = new LambdaQueryWrapper<>();
- wrapper.eq(ApplicationConfig::getAppId, application.getId())
+ LambdaQueryWrapper<ApplicationConfig> queryWrapper = new LambdaQueryWrapper<ApplicationConfig>()
+ .eq(ApplicationConfig::getAppId, application.getId())
.orderByDesc(ApplicationConfig::getVersion);
- List<ApplicationConfig> configList = this.baseMapper.selectList(wrapper);
+ List<ApplicationConfig> configList = this.baseMapper.selectList(queryWrapper);
ApplicationConfig effective = getEffective(application.getId());
if (effective != null) {
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationLogServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationLogServiceImpl.java
index 4293bc23d..f5c44c87d 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationLogServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationLogServiceImpl.java
@@ -20,11 +20,11 @@ package org.apache.streampark.console.core.service.impl;
import org.apache.streampark.console.base.domain.Constant;
import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
-import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.entity.ApplicationLog;
import org.apache.streampark.console.core.mapper.ApplicationLogMapper;
import org.apache.streampark.console.core.service.ApplicationLogService;
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
@@ -41,12 +41,16 @@ public class ApplicationLogServiceImpl extends ServiceImpl<ApplicationLogMapper,
@Override
public IPage<ApplicationLog> page(ApplicationLog applicationLog, RestRequest request) {
- Page<Application> page = new MybatisPager<Application>().getPage(request, "option_time", Constant.ORDER_DESC);
- return this.baseMapper.page(page, applicationLog.getAppId());
+ Page<ApplicationLog> page = new MybatisPager<ApplicationLog>().getPage(request, "option_time", Constant.ORDER_DESC);
+ LambdaQueryWrapper<ApplicationLog> queryWrapper = new LambdaQueryWrapper<ApplicationLog>()
+ .eq(ApplicationLog::getAppId, applicationLog.getAppId());
+ return this.page(page, queryWrapper);
}
@Override
public void removeApp(Long appId) {
- baseMapper.removeApp(appId);
+ LambdaQueryWrapper<ApplicationLog> queryWrapper = new LambdaQueryWrapper<ApplicationLog>()
+ .eq(ApplicationLog::getAppId, appId);
+ this.remove(queryWrapper);
}
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index e137cbceb..e9b6e95e3 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -212,7 +212,9 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
@PostConstruct
public void resetOptionState() {
- this.baseMapper.resetOptionState();
+ Application application = new Application();
+ application.setOptionState(0);
+ this.update(application);
}
private final Map<Long, CompletableFuture<SubmitResponse>> startFutureMap = new ConcurrentHashMap<>();
@@ -515,7 +517,9 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
@Override
public long countByTeamId(Long teamId) {
- return this.count(new LambdaQueryWrapper<Application>().eq(Application::getTeamId, teamId));
+ LambdaQueryWrapper<Application> queryWrapper = new LambdaQueryWrapper<Application>()
+ .eq(Application::getTeamId, teamId);
+ return this.count(queryWrapper);
}
@Override
@@ -535,9 +539,9 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
if (!checkJobName(appParam.getJobName())) {
return AppExistsState.INVALID;
}
- boolean inDB = this.baseMapper.selectCount(
- new LambdaQueryWrapper<Application>()
- .eq(Application::getJobName, appParam.getJobName())) > 0;
+ LambdaQueryWrapper<Application> queryWrapper = new LambdaQueryWrapper<Application>()
+ .eq(Application::getJobName, appParam.getJobName());
+ boolean inDB = this.baseMapper.selectCount(queryWrapper) > 0;
if (appParam.getId() != null) {
Application app = getById(appParam.getId());
@@ -624,9 +628,9 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
@SneakyThrows
@Transactional(rollbackFor = {Exception.class})
public Long copy(Application appParam) {
- long count = this.baseMapper.selectCount(
- new LambdaQueryWrapper<Application>()
- .eq(Application::getJobName, appParam.getJobName()));
+ LambdaQueryWrapper<Application> queryWrapper = new LambdaQueryWrapper<Application>()
+ .eq(Application::getJobName, appParam.getJobName());
+ long count = this.baseMapper.selectCount(queryWrapper);
if (count > 0) {
throw new IllegalArgumentException("[StreamPark] Application names cannot be repeated");
}
@@ -878,7 +882,9 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
@Override
public List<Application> getByProjectId(Long id) {
- return baseMapper.getByProjectId(id);
+ LambdaQueryWrapper<Application> queryWrapper = new LambdaQueryWrapper<Application>()
+ .eq(Application::getProjectId, id);
+ return this.list(queryWrapper);
}
@Override
@@ -1112,7 +1118,8 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
extraParameter
);
- CompletableFuture<CancelResponse> cancelFuture = CompletableFuture.supplyAsync(() -> FlinkSubmitter.cancel(cancelRequest), executorService);
+ CompletableFuture<CancelResponse> cancelFuture =
+ CompletableFuture.supplyAsync(() -> FlinkSubmitter.cancel(cancelRequest), executorService);
cancelFutureMap.put(application.getId(), cancelFuture);
@@ -1186,10 +1193,12 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
final String pathPart = uri.getPath();
String error = null;
if (scheme == null) {
- error = "This state.savepoints.dir value " + savepointPath + " scheme (hdfs://, file://, etc) of is null. Please specify the file system scheme explicitly in the URI.";
+ error = "This state.savepoints.dir value " + savepointPath +
+ " scheme (hdfs://, file://, etc) of is null. Please specify the file system scheme explicitly in the URI.";
} else if (pathPart == null) {
- error = "This state.savepoints.dir value " + savepointPath + " path part to store the checkpoint data in is null. Please specify a directory path for the checkpoint data.";
- } else if (pathPart.length() == 0 || pathPart.equals("/")) {
+ error = "This state.savepoints.dir value " + savepointPath +
+ " path part to store the checkpoint data in is null. Please specify a directory path for the checkpoint data.";
+ } else if (pathPart.length() == 0 || "/".equals(pathPart)) {
error = "This state.savepoints.dir value " + savepointPath + " Cannot use the root directory for checkpoints.";
}
return error;
@@ -1263,14 +1272,15 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
appConf = String.format("%s://%s", applicationConfig.configType(), applicationConfig.getContent());
break;
case APACHE_FLINK:
- appConf = String.format("json://{\"%s\":\"%s\"}", ConfigConst.KEY_FLINK_APPLICATION_MAIN_CLASS(), application.getMainClass());
+ appConf = String.format("json://{\"%s\":\"%s\"}", ConfigConst.KEY_FLINK_APPLICATION_MAIN_CLASS(),
+ application.getMainClass());
break;
default:
throw new IllegalArgumentException("[StreamPark] ApplicationType must be (StreamPark flink | Apache flink)... ");
}
}
- if (executionMode.equals(ExecutionMode.YARN_APPLICATION)) {
+ if (ExecutionMode.YARN_APPLICATION.equals(executionMode)) {
switch (application.getApplicationType()) {
case STREAMPARK_FLINK:
flinkUserJar = String.format("%s/%s", application.getAppLib(), application.getModule().concat(".jar"));
@@ -1291,7 +1301,7 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
// 2) appConfig
appConf = applicationConfig == null ? null : String.format("yaml://%s", applicationConfig.getContent());
// 3) client
- if (executionMode.equals(ExecutionMode.YARN_APPLICATION)) {
+ if (ExecutionMode.YARN_APPLICATION.equals(executionMode)) {
String clientPath = Workspace.remote().APP_CLIENT();
flinkUserJar = String.format("%s/%s", clientPath, sqlDistJar);
}
@@ -1356,7 +1366,7 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
AssertUtils.state(buildPipeline != null);
BuildResult buildResult = buildPipeline.getBuildResult();
- if (executionMode.equals(ExecutionMode.YARN_APPLICATION)) {
+ if (ExecutionMode.YARN_APPLICATION.equals(executionMode)) {
buildResult = new ShadedBuildResponse(null, flinkUserJar, true);
} else {
if (ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
@@ -1410,7 +1420,8 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
extraParameter
);
- CompletableFuture<SubmitResponse> future = CompletableFuture.supplyAsync(() -> FlinkSubmitter.submit(submitRequest), executorService);
+ CompletableFuture<SubmitResponse> future =
+ CompletableFuture.supplyAsync(() -> FlinkSubmitter.submit(submitRequest), executorService);
startFutureMap.put(application.getId(), future);
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/EffectiveServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/EffectiveServiceImpl.java
index bcae2cf23..db877b8ff 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/EffectiveServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/EffectiveServiceImpl.java
@@ -40,15 +40,18 @@ public class EffectiveServiceImpl extends ServiceImpl<EffectiveMapper, Effective
@Override
public void delete(Long appId, EffectiveType effectiveType) {
- LambdaQueryWrapper<Effective> queryWrapper = new LambdaQueryWrapper();
- queryWrapper.eq(Effective::getAppId, appId)
+ LambdaQueryWrapper<Effective> queryWrapper = new LambdaQueryWrapper<Effective>()
+ .eq(Effective::getAppId, appId)
.eq(Effective::getTargetType, effectiveType.getType());
baseMapper.delete(queryWrapper);
}
@Override
public Effective get(Long appId, EffectiveType effectiveType) {
- return baseMapper.get(appId, effectiveType.getType());
+ LambdaQueryWrapper<Effective> queryWrapper = new LambdaQueryWrapper<Effective>()
+ .eq(Effective::getAppId, appId)
+ .eq(Effective::getTargetType, effectiveType.getType());
+ return this.getOne(queryWrapper);
}
@Override
@@ -75,6 +78,8 @@ public class EffectiveServiceImpl extends ServiceImpl<EffectiveMapper, Effective
@Override
public void removeApp(Long appId) {
- baseMapper.removeApp(appId);
+ LambdaQueryWrapper<Effective> queryWrapper = new LambdaQueryWrapper<Effective>()
+ .eq(Effective::getAppId, appId);
+ this.remove(queryWrapper);
}
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlameGraphServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlameGraphServiceImpl.java
index 45b2af2cf..516588661 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlameGraphServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlameGraphServiceImpl.java
@@ -26,6 +26,7 @@ import org.apache.streampark.console.core.mapper.FlameGraphMapper;
import org.apache.streampark.console.core.service.ApplicationService;
import org.apache.streampark.console.core.service.FlameGraphService;
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;
@@ -45,28 +46,29 @@ import java.util.List;
@Service
@Transactional(propagation = Propagation.SUPPORTS, readOnly = true, rollbackFor = Exception.class)
public class FlameGraphServiceImpl extends ServiceImpl<FlameGraphMapper, FlameGraph>
- implements FlameGraphService {
+ implements FlameGraphService {
@Autowired
private ApplicationService applicationService;
@Override
public String generateFlameGraph(FlameGraph flameGraph) throws IOException {
- List<FlameGraph> flameGraphList = this.baseMapper.getFlameGraph(
- flameGraph.getAppId(),
- flameGraph.getStart(),
- flameGraph.getEnd()
- );
+ LambdaQueryWrapper<FlameGraph> queryWrapper = new LambdaQueryWrapper<FlameGraph>()
+ .eq(FlameGraph::getAppId, flameGraph.getAppId())
+ .between(FlameGraph::getTimeline, flameGraph.getStart(), flameGraph.getEnd())
+ .orderByAsc(FlameGraph::getTimeline);
+ List<FlameGraph> flameGraphList = this.list(queryWrapper);
+
if (CommonUtils.notEmpty(flameGraphList)) {
StringBuffer jsonBuffer = new StringBuffer();
flameGraphList.forEach(x -> jsonBuffer.append(x.getUnzipContent()).append("\r\n"));
Application application = applicationService.getById(flameGraph.getAppId());
String jsonName = String.format(
- "%d_%d_%d.json",
- flameGraph.getAppId(),
- flameGraph.getStart().getTime(),
- flameGraph.getEnd().getTime()
+ "%d_%d_%d.json",
+ flameGraph.getAppId(),
+ flameGraph.getStart().getTime(),
+ flameGraph.getEnd().getTime()
);
String jsonPath = new File(WebUtils.getAppTempDir(), jsonName).getAbsolutePath();
String foldedPath = jsonPath.replace(".json", ".folded");
@@ -80,14 +82,14 @@ public class FlameGraphServiceImpl extends ServiceImpl<FlameGraphMapper, FlameGr
String title = application.getJobName().concat(" ___ FlameGraph");
// generate...
List<String> commands = Arrays.asList(
- String.format("python ./stackcollapse.py -i %s > %s ", jsonPath, foldedPath),
- String.format(
- "./flamegraph.pl --title=\"%s\" --width=%d --colors=java %s > %s ",
- title,
- flameGraph.getWidth(),
- foldedPath,
- svgPath
- )
+ String.format("python ./stackcollapse.py -i %s > %s ", jsonPath, foldedPath),
+ String.format(
+ "./flamegraph.pl --title=\"%s\" --width=%d --colors=java %s > %s ",
+ title,
+ flameGraph.getWidth(),
+ foldedPath,
+ svgPath
+ )
);
CommandUtils.execute(flameGraphPath.getAbsolutePath(), commands, (line) -> log.info("flameGraph: {} ", line));
return svgPath;
@@ -97,6 +99,8 @@ public class FlameGraphServiceImpl extends ServiceImpl<FlameGraphMapper, FlameGr
@Override
public void clean(Date end) {
- baseMapper.clean(end);
+ LambdaQueryWrapper<FlameGraph> queryWrapper = new LambdaQueryWrapper<FlameGraph>()
+ .lt(FlameGraph::getTimeline, end);
+ this.remove(queryWrapper);
}
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index 89afdd20d..b2cc47bdc 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -39,6 +39,7 @@ import org.apache.streampark.flink.submit.bean.KubernetesDeployParam;
import org.apache.streampark.flink.submit.bean.ShutDownRequest;
import org.apache.streampark.flink.submit.bean.ShutDownResponse;
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
@@ -89,15 +90,18 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli
if (null == cluster.getClusterName() || null == cluster.getExecutionMode()) {
return "error";
}
- //1) 检查名称是否重复,是否已经存在
- FlinkCluster flinkCluster = this.baseMapper.getByName(cluster.getClusterName());
+ //1) Check if name is duplicate, if it already exists
+ LambdaQueryWrapper<FlinkCluster> queryWrapper = new LambdaQueryWrapper<FlinkCluster>()
+ .eq(FlinkCluster::getClusterName, cluster.getClusterName());
+ FlinkCluster flinkCluster = this.getOne(queryWrapper);
if (flinkCluster != null) {
- if (cluster.getId() == null || (cluster.getId() != null && !flinkCluster.getId().equals(cluster.getId()))) {
+ boolean isExists = cluster.getId() == null || (cluster.getId() != null && !flinkCluster.getId().equals(cluster.getId()));
+ if (isExists) {
return "exists";
}
}
if (ExecutionMode.REMOTE.equals(cluster.getExecutionModeEnum())) {
- //2) 检查连接是否能连接到
+ //2) Check if the connection can be made to
return cluster.verifyConnection() ? "success" : "fail";
}
return "success";
@@ -113,8 +117,10 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli
}
String clusterId = flinkCluster.getClusterId();
if (StringUtils.isNoneBlank(clusterId)) {
- FlinkCluster inDB = this.baseMapper.getByClusterId(clusterId);
- if (inDB != null) {
+ LambdaQueryWrapper<FlinkCluster> queryWrapper = new LambdaQueryWrapper<FlinkCluster>()
+ .eq(FlinkCluster::getClusterId, clusterId);
+ FlinkCluster inDb = this.getOne(queryWrapper);
+ if (inDb != null) {
result.setMsg("the clusterId" + clusterId + "is already exists,please check!");
result.setStatus(0);
return result;
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java
index dbea94991..325cd0cf5 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java
@@ -45,12 +45,12 @@ public class FlinkEnvServiceImpl extends ServiceImpl<FlinkEnvMapper, FlinkEnv> i
@Override
public boolean exists(FlinkEnv version) {
//1) check name
- LambdaQueryWrapper<FlinkEnv> nameQuery = new LambdaQueryWrapper<FlinkEnv>()
+ LambdaQueryWrapper<FlinkEnv> queryWrapper = new LambdaQueryWrapper<FlinkEnv>()
.eq(FlinkEnv::getFlinkName, version.getFlinkName());
if (version.getId() != null) {
- nameQuery.ne(FlinkEnv::getId, version.getId());
+ queryWrapper.ne(FlinkEnv::getId, version.getId());
}
- return this.count(nameQuery) == 0;
+ return this.count(queryWrapper) == 0;
}
@Override
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java
index 66e98df5a..12099522a 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java
@@ -77,9 +77,9 @@ public class FlinkSqlServiceImpl extends ServiceImpl<FlinkSqlMapper, FlinkSql> i
public FlinkSql getLatestFlinkSql(Long appId, boolean decode) {
Page<FlinkSql> page = new Page<>();
page.setCurrent(0).setSize(1).setSearchCount(false);
- LambdaQueryWrapper<FlinkSql> queryWrapper =
- new LambdaQueryWrapper<FlinkSql>().eq(FlinkSql::getAppId, appId)
- .orderByDesc(FlinkSql::getVersion);
+ LambdaQueryWrapper<FlinkSql> queryWrapper = new LambdaQueryWrapper<FlinkSql>()
+ .eq(FlinkSql::getAppId, appId)
+ .orderByDesc(FlinkSql::getVersion);
Page<FlinkSql> flinkSqlPage = baseMapper.selectPage(page, queryWrapper);
if (!flinkSqlPage.getRecords().isEmpty()) {
@@ -120,11 +120,11 @@ public class FlinkSqlServiceImpl extends ServiceImpl<FlinkSqlMapper, FlinkSql> i
@Override
public List<FlinkSql> history(Application application) {
- LambdaQueryWrapper<FlinkSql> wrapper = new LambdaQueryWrapper<>();
- wrapper.eq(FlinkSql::getAppId, application.getId())
+ LambdaQueryWrapper<FlinkSql> queryWrapper = new LambdaQueryWrapper<FlinkSql>()
+ .eq(FlinkSql::getAppId, application.getId())
.orderByDesc(FlinkSql::getVersion);
- List<FlinkSql> sqlList = this.baseMapper.selectList(wrapper);
+ List<FlinkSql> sqlList = this.baseMapper.selectList(queryWrapper);
FlinkSql effective = getEffective(application.getId(), false);
if (effective != null && !sqlList.isEmpty()) {
for (FlinkSql sql : sqlList) {
@@ -165,7 +165,9 @@ public class FlinkSqlServiceImpl extends ServiceImpl<FlinkSqlMapper, FlinkSql> i
@Override
public void removeApp(Long appId) {
- baseMapper.delete(new LambdaQueryWrapper<FlinkSql>().eq(FlinkSql::getAppId, appId));
+ LambdaQueryWrapper<FlinkSql> queryWrapper = new LambdaQueryWrapper<FlinkSql>()
+ .eq(FlinkSql::getAppId, appId);
+ baseMapper.delete(queryWrapper);
}
@Override
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/LoggerServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/LoggerServiceImpl.java
index ecc171ba2..d8fdbc687 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/LoggerServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/LoggerServiceImpl.java
@@ -46,6 +46,7 @@ public class LoggerServiceImpl implements LoggerService {
* @param limit limit
* @return log string data
*/
+ @Override
public CompletionStage<String> queryLog(String nameSpace, String jobName, String jobId, int skipLineNum, int limit) {
return CompletableFuture.supplyAsync(() -> jobDeploymentsWatch(nameSpace, jobName, jobId)
).exceptionally(e -> {
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/MessageServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/MessageServiceImpl.java
index 902ec4af8..d99ff46e8 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/MessageServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/MessageServiceImpl.java
@@ -27,6 +27,7 @@ import org.apache.streampark.console.core.websocket.WebSocketEndpoint;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@@ -47,9 +48,10 @@ public class MessageServiceImpl extends ServiceImpl<MessageMapper, Message>
@Override
public IPage<Message> getUnRead(NoticeType noticeType, RestRequest request) {
- LambdaQueryWrapper<Message> query = new LambdaQueryWrapper();
- query.eq(Message::getIsRead, false).orderByDesc(Message::getCreateTime);
- query.eq(Message::getType, noticeType.get());
- return this.baseMapper.selectPage(new MybatisPager<Message>().getDefaultPage(request), query);
+ Page<Message> page = new MybatisPager<Message>().getDefaultPage(request);
+ LambdaQueryWrapper<Message> queryWrapper = new LambdaQueryWrapper<Message>()
+ .eq(Message::getIsRead, false).orderByDesc(Message::getCreateTime)
+ .eq(Message::getType, noticeType.get());
+ return this.baseMapper.selectPage(page, queryWrapper);
}
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
index 47e04cf3e..73fbf7f1e 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
@@ -45,6 +45,7 @@ import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
@@ -89,8 +90,8 @@ public class ProjectServiceImpl extends ServiceImpl<ProjectMapper, Project>
@Override
public RestResponse create(Project project) {
- LambdaQueryWrapper<Project> queryWrapper = new LambdaQueryWrapper<>();
- queryWrapper.eq(Project::getName, project.getName());
+ LambdaQueryWrapper<Project> queryWrapper = new LambdaQueryWrapper<Project>()
+ .eq(Project::getName, project.getName());
long count = count(queryWrapper);
RestResponse response = RestResponse.success();
if (count == 0) {
@@ -147,8 +148,8 @@ public class ProjectServiceImpl extends ServiceImpl<ProjectMapper, Project>
public boolean delete(Long id) {
Project project = getById(id);
AssertUtils.state(project != null);
- LambdaQueryWrapper<Application> queryWrapper = new LambdaQueryWrapper<>();
- queryWrapper.eq(Application::getProjectId, id);
+ LambdaQueryWrapper<Application> queryWrapper = new LambdaQueryWrapper<Application>()
+ .eq(Application::getProjectId, id);
long count = applicationService.count(queryWrapper);
if (count > 0) {
return false;
@@ -165,20 +166,26 @@ public class ProjectServiceImpl extends ServiceImpl<ProjectMapper, Project>
@Override
public IPage<Project> page(Project project, RestRequest request) {
Page<Project> page = new MybatisPager<Project>().getDefaultPage(request);
- return this.baseMapper.page(page, project);
+ LambdaQueryWrapper<Project> queryWrapper = new LambdaQueryWrapper<Project>()
+ .eq(Project::getTeamId, project.getTeamId())
+ .like(StringUtils.isNotBlank(project.getName()), Project::getName, project.getName())
+ .eq(project.getBuildState() != null, Project::getBuildState, project.getBuildState());
+ return this.page(page, queryWrapper);
}
@Override
public long countByTeamId(Long teamId) {
- return this.count(new LambdaQueryWrapper<Project>().eq(Project::getTeamId, teamId));
+ LambdaQueryWrapper<Project> queryWrapper = new LambdaQueryWrapper<Project>()
+ .eq(Project::getTeamId, teamId);
+ return this.count(queryWrapper);
}
@Override
public void build(Long id) throws Exception {
Project project = getById(id);
- this.baseMapper.startBuild(project);
- CompletableFuture<Void> buildTask = CompletableFuture.runAsync(
- new ProjectBuildTask(getBuildLogPath(id), project, baseMapper, applicationService), executorService);
+ this.baseMapper.updateStartBuildById(project);
+ ProjectBuildTask projectBuildTask = new ProjectBuildTask(getBuildLogPath(id), project, baseMapper, applicationService);
+ CompletableFuture<Void> buildTask = CompletableFuture.runAsync(projectBuildTask, executorService);
// TODO May need to define parameters to set the build timeout in the future.
CompletableFutureUtils.runTimeout(buildTask, 20, TimeUnit.MINUTES);
}
@@ -227,9 +234,9 @@ public class ProjectServiceImpl extends ServiceImpl<ProjectMapper, Project>
return false;
}
}
- LambdaQueryWrapper<Project> wrapper = new LambdaQueryWrapper<Project>()
+ LambdaQueryWrapper<Project> queryWrapper = new LambdaQueryWrapper<Project>()
.eq(Project::getName, project.getName());
- return this.baseMapper.selectCount(wrapper) > 0;
+ return this.baseMapper.selectCount(queryWrapper) > 0;
}
@Override
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
index 7000a00fd..02a6b2659 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
@@ -51,7 +51,11 @@ public class SavePointServiceImpl extends ServiceImpl<SavePointMapper, SavePoint
@Override
public void obsolete(Long appId) {
- this.baseMapper.obsolete(appId);
+ SavePoint savePoint = new SavePoint();
+ savePoint.setLatest(false);
+ LambdaQueryWrapper<SavePoint> queryWrapper = new LambdaQueryWrapper<SavePoint>()
+ .eq(SavePoint::getAppId, appId);
+ this.update(savePoint, queryWrapper);
}
@Override
@@ -74,10 +78,13 @@ public class SavePointServiceImpl extends ServiceImpl<SavePointMapper, SavePoint
}
if (cpThreshold == 0) {
- this.baseMapper.expireAll(entity.getAppId());
+ LambdaQueryWrapper<SavePoint> queryWrapper = new LambdaQueryWrapper<SavePoint>()
+ .eq(SavePoint::getAppId, entity.getAppId())
+ .eq(SavePoint::getType, 1);
+ this.remove(queryWrapper);
} else {
- LambdaQueryWrapper<SavePoint> queryWrapper = new LambdaQueryWrapper<SavePoint>();
- queryWrapper.select(SavePoint::getTriggerTime)
+ LambdaQueryWrapper<SavePoint> queryWrapper = new LambdaQueryWrapper<SavePoint>()
+ .select(SavePoint::getTriggerTime)
.eq(SavePoint::getAppId, entity.getAppId())
.eq(SavePoint::getType, CheckPointType.CHECKPOINT.get())
.orderByDesc(SavePoint::getTriggerTime);
@@ -85,14 +92,21 @@ public class SavePointServiceImpl extends ServiceImpl<SavePointMapper, SavePoint
Page<SavePoint> savePointPage = this.baseMapper.selectPage(new Page<>(1, cpThreshold + 1), queryWrapper);
if (!savePointPage.getRecords().isEmpty() && savePointPage.getRecords().size() > cpThreshold) {
SavePoint savePoint = savePointPage.getRecords().get(cpThreshold - 1);
- this.baseMapper.expire(entity.getAppId(), savePoint.getTriggerTime());
+ LambdaQueryWrapper<SavePoint> lambdaQueryWrapper = new LambdaQueryWrapper<SavePoint>()
+ .eq(SavePoint::getAppId, entity.getAppId())
+ .eq(SavePoint::getType, 1)
+ .lt(SavePoint::getTriggerTime, savePoint.getTriggerTime());
+ this.remove(lambdaQueryWrapper);
}
}
}
@Override
public SavePoint getLatest(Long id) {
- return this.baseMapper.getLatest(id);
+ LambdaQueryWrapper<SavePoint> queryWrapper = new LambdaQueryWrapper<SavePoint>()
+ .eq(SavePoint::getAppId, id)
+ .eq(SavePoint::getLatest, true);
+ return this.getOne(queryWrapper);
}
@Override
@@ -113,13 +127,19 @@ public class SavePointServiceImpl extends ServiceImpl<SavePointMapper, SavePoint
@Override
public IPage<SavePoint> page(SavePoint savePoint, RestRequest request) {
Page<SavePoint> page = new MybatisPager<SavePoint>().getPage(request, "trigger_time", Constant.ORDER_DESC);
- return this.baseMapper.page(page, savePoint.getAppId());
+ LambdaQueryWrapper<SavePoint> queryWrapper = new LambdaQueryWrapper<SavePoint>()
+ .eq(SavePoint::getAppId, savePoint.getAppId());
+ return this.page(page, queryWrapper);
}
@Override
public void removeApp(Application application) {
Long appId = application.getId();
- baseMapper.removeApp(application.getId());
+
+ LambdaQueryWrapper<SavePoint> queryWrapper = new LambdaQueryWrapper<SavePoint>()
+ .eq(SavePoint::getAppId, appId);
+ this.remove(queryWrapper);
+
try {
application.getFsOperator().delete(application.getWorkspace().APP_SAVEPOINTS().concat("/").concat(appId.toString()));
} catch (Exception e) {
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SettingServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SettingServiceImpl.java
index 7680903b7..e860e5e7d 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SettingServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SettingServiceImpl.java
@@ -24,6 +24,7 @@ import org.apache.streampark.console.core.entity.Setting;
import org.apache.streampark.console.core.mapper.SettingMapper;
import org.apache.streampark.console.core.service.SettingService;
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
@@ -44,7 +45,9 @@ public class SettingServiceImpl extends ServiceImpl<SettingMapper, Setting>
@Override
public Setting get(String key) {
- return baseMapper.get(key);
+ LambdaQueryWrapper<Setting> queryWrapper = new LambdaQueryWrapper<Setting>()
+ .eq(Setting::getSettingKey, key);
+ return this.getOne(queryWrapper);
}
private final Setting emptySetting = new Setting();
@@ -60,7 +63,12 @@ public class SettingServiceImpl extends ServiceImpl<SettingMapper, Setting>
try {
String value = StringUtils.trimToNull(setting.getSettingValue());
setting.setSettingValue(value);
- this.baseMapper.updateByKey(setting);
+
+ Setting entity = new Setting();
+ entity.setSettingValue(setting.getSettingValue());
+ LambdaQueryWrapper<Setting> queryWrapper = new LambdaQueryWrapper<Setting>()
+ .eq(Setting::getSettingKey, setting.getSettingKey());
+ this.update(entity, queryWrapper);
String settingKey = setting.getSettingKey();
if (CommonConfig.MAVEN_REMOTE_URL().key().equals(settingKey)) {
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/TutorialServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/TutorialServiceImpl.java
index 4151a5e75..0c83ff8b2 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/TutorialServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/TutorialServiceImpl.java
@@ -21,6 +21,7 @@ import org.apache.streampark.console.core.entity.Tutorial;
import org.apache.streampark.console.core.mapper.TutorialMapper;
import org.apache.streampark.console.core.service.TutorialService;
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@@ -30,10 +31,12 @@ import org.springframework.transaction.annotation.Transactional;
@Slf4j
@Service
@Transactional(propagation = Propagation.SUPPORTS, readOnly = true, rollbackFor = Exception.class)
-public class TutorialServiceImpl extends ServiceImpl<TutorialMapper, Tutorial>
- implements TutorialService {
+public class TutorialServiceImpl extends ServiceImpl<TutorialMapper, Tutorial> implements TutorialService {
+
@Override
public Tutorial getByName(String name) {
- return this.baseMapper.getByName(name);
+ LambdaQueryWrapper<Tutorial> queryWrapper = new LambdaQueryWrapper<Tutorial>()
+ .eq(Tutorial::getName, name);
+ return this.getOne(queryWrapper);
}
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/VariableServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/VariableServiceImpl.java
index a74bc5267..62ed98c92 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/VariableServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/VariableServiceImpl.java
@@ -117,13 +117,15 @@ public class VariableServiceImpl extends ServiceImpl<VariableMapper, Variable> i
@Override
public Variable findByVariableCode(Long teamId, String variableCode) {
- return baseMapper.selectOne(new LambdaQueryWrapper<Variable>()
+ LambdaQueryWrapper<Variable> queryWrapper = new LambdaQueryWrapper<Variable>()
.eq(Variable::getVariableCode, variableCode)
- .eq(Variable::getTeamId, teamId));
+ .eq(Variable::getTeamId, teamId);
+ return baseMapper.selectOne(queryWrapper);
}
/**
* get variables through team
+ *
* @param teamId
* @return
*/
@@ -134,6 +136,7 @@ public class VariableServiceImpl extends ServiceImpl<VariableMapper, Variable> i
/**
* Get variables through team and search keywords.
+ *
* @param teamId
* @param keyword Fuzzy search keywords through variable code or description, Nullable.
* @return
@@ -145,8 +148,9 @@ public class VariableServiceImpl extends ServiceImpl<VariableMapper, Variable> i
/**
* Replace variable with defined variable codes.
+ *
* @param teamId
- * @param mixed Text with placeholders, e.g. "--cluster ${kafka.cluster}"
+ * @param mixed Text with placeholders, e.g. "--cluster ${kafka.cluster}"
* @return
*/
@Override
@@ -158,7 +162,8 @@ public class VariableServiceImpl extends ServiceImpl<VariableMapper, Variable> i
if (CollectionUtils.isEmpty(variables)) {
return mixed;
}
- Map<String, String> variableMap = variables.stream().collect(Collectors.toMap(Variable::getVariableCode, Variable::getVariableValue));
+ Map<String, String> variableMap =
+ variables.stream().collect(Collectors.toMap(Variable::getVariableCode, Variable::getVariableValue));
String restore = mixed;
Matcher matcher = PLACEHOLDER_PATTERN.matcher(restore);
while (matcher.find()) {
@@ -179,7 +184,8 @@ public class VariableServiceImpl extends ServiceImpl<VariableMapper, Variable> i
private List<Application> getDependApplicationsByCode(Variable variable) {
List<Application> dependApplications = new ArrayList<>();
List<Application> applications = applicationService.getByTeamId(variable.getTeamId());
- Map<Long, Application> applicationMap = applications.stream().collect(Collectors.toMap(Application::getId, application -> application));
+ Map<Long, Application> applicationMap =
+ applications.stream().collect(Collectors.toMap(Application::getId, application -> application));
// Get applications that depend on this variable in application args
if (applications != null) {
@@ -206,8 +212,9 @@ public class VariableServiceImpl extends ServiceImpl<VariableMapper, Variable> i
/**
* Determine whether variableCode is dependent on mixed.
+ *
* @param variableCode Variable code, e.g. "kafka.cluster"
- * @param mixed Text with placeholders, e.g. "--cluster ${kafka.cluster}"
+ * @param mixed Text with placeholders, e.g. "--cluster ${kafka.cluster}"
* @return If mixed can match the variableCode, return true, otherwise return false
*/
private boolean isDepend(String variableCode, String mixed) {
@@ -224,6 +231,8 @@ public class VariableServiceImpl extends ServiceImpl<VariableMapper, Variable> i
@Override
public long countByTeamId(Long teamId) {
- return this.count(new LambdaQueryWrapper<Variable>().eq(Variable::getTeamId, teamId));
+ LambdaQueryWrapper<Variable> queryWrapper = new LambdaQueryWrapper<Variable>()
+ .eq(Variable::getTeamId, teamId);
+ return this.count(queryWrapper);
}
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/ProjectBuildTask.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/ProjectBuildTask.java
index b738058e6..3410ab256 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/ProjectBuildTask.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/ProjectBuildTask.java
@@ -64,16 +64,16 @@ public class ProjectBuildTask extends AbstractLogFileTask {
boolean cloneSuccess = cloneSourceCode(project);
if (!cloneSuccess) {
fileLogger.error("[StreamPark] clone or pull error.");
- this.baseMapper.failureBuild(project);
+ this.baseMapper.updateFailureBuildById(project);
return;
}
boolean build = projectBuild(project);
if (!build) {
- this.baseMapper.failureBuild(project);
+ this.baseMapper.updateFailureBuildById(project);
fileLogger.error("build error, project name: {} ", project.getName());
return;
}
- this.baseMapper.successBuild(project);
+ this.baseMapper.updateSuccessBuildById(project);
this.deploy(project);
List<Application> applications = this.applicationService.getByProjectId(project.getId());
// Update the deploy state
@@ -87,7 +87,7 @@ public class ProjectBuildTask extends AbstractLogFileTask {
@Override
protected void processException(Throwable t) {
- this.baseMapper.failureBuild(project);
+ this.baseMapper.updateFailureBuildById(project);
fileLogger.error("Build error, project name: {}", project.getName(), t);
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MemberServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MemberServiceImpl.java
index e53974052..153870fe0 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MemberServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MemberServiceImpl.java
@@ -71,7 +71,9 @@ public class MemberServiceImpl extends ServiceImpl<MemberMapper, Member>
@Override
public void deleteByTeamId(Long teamId) {
- this.remove(new LambdaQueryWrapper<Member>().eq(Member::getTeamId, teamId));
+ LambdaQueryWrapper<Member> queryWrapper = new LambdaQueryWrapper<Member>()
+ .eq(Member::getTeamId, teamId);
+ this.remove(queryWrapper);
}
@Override
@@ -99,16 +101,16 @@ public class MemberServiceImpl extends ServiceImpl<MemberMapper, Member>
private Member findByUserId(Long teamId, Long userId) {
AssertUtils.isTrue(teamId != null, "The team id is required.");
- return baseMapper.selectOne(
- new LambdaQueryWrapper<Member>().eq(Member::getTeamId, teamId)
- .eq(Member::getUserId, userId));
+ LambdaQueryWrapper<Member> queryWrapper = new LambdaQueryWrapper<Member>().eq(Member::getTeamId, teamId)
+ .eq(Member::getUserId, userId);
+ return baseMapper.selectOne(queryWrapper);
}
@Override
public List<Long> findUserIdsByRoleId(Long roleId) {
- List<Member> list =
- baseMapper.selectList(
- new LambdaQueryWrapper<Member>().eq(Member::getRoleId, roleId));
+ LambdaQueryWrapper<Member> queryWrapper = new LambdaQueryWrapper<Member>()
+ .eq(Member::getRoleId, roleId);
+ List<Member> list = baseMapper.selectList(queryWrapper);
return list.stream()
.map(Member::getUserId)
.collect(Collectors.toList());
@@ -123,7 +125,8 @@ public class MemberServiceImpl extends ServiceImpl<MemberMapper, Member>
Team team = Optional.ofNullable(teamService.getById(member.getTeamId()))
.orElseThrow(() -> new IllegalArgumentException(String.format("The teamId [%s] not found", member.getTeamId())));
AssertUtils.isTrue(findByUserId(member.getTeamId(), user.getUserId()) == null,
- String.format("The user [%s] has been added the team [%s], please don't add it again.", member.getUserName(), team.getTeamName()));
+ String.format("The user [%s] has been added the team [%s], please don't add it again.", member.getUserName(),
+ team.getTeamName()));
member.setId(null);
member.setUserId(user.getUserId());
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MenuServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MenuServiceImpl.java
index 1127f124b..b169e2c11 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MenuServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MenuServiceImpl.java
@@ -75,7 +75,10 @@ public class MenuServiceImpl extends ServiceImpl<MenuMapper, Menu> implements Me
.orElseThrow(() -> new IllegalArgumentException(String.format("The userId:[%s] not found", userId)));
// Admin has the permission for all menus.
if (UserType.ADMIN.equals(user.getUserType())) {
- return this.list(new LambdaQueryWrapper<Menu>().eq(Menu::getType, "0").orderByAsc(Menu::getOrderNum));
+ LambdaQueryWrapper<Menu> queryWrapper = new LambdaQueryWrapper<Menu>()
+ .eq(Menu::getType, "0")
+ .orderByAsc(Menu::getOrderNum);
+ return this.list(queryWrapper);
}
return this.baseMapper.findUserMenus(userId, teamId);
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/RoleMenuServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/RoleMenuServiceImpl.java
index 56fb313fd..8a2968731 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/RoleMenuServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/RoleMenuServiceImpl.java
@@ -38,18 +38,23 @@ public class RoleMenuServiceImpl extends ServiceImpl<RoleMenuMapper, RoleMenu>
@Override
@Transactional
public void deleteByRoleId(Long roleId) {
- baseMapper.delete(new LambdaQueryWrapper<RoleMenu>().eq(RoleMenu::getRoleId, roleId));
+ LambdaQueryWrapper<RoleMenu> queryWrapper = new LambdaQueryWrapper<RoleMenu>()
+ .eq(RoleMenu::getRoleId, roleId);
+ baseMapper.delete(queryWrapper);
}
@Override
@Transactional
public void deleteByMenuId(String[] menuIds) {
List<String> list = Arrays.asList(menuIds);
- baseMapper.delete(new LambdaQueryWrapper<RoleMenu>().in(RoleMenu::getMenuId, list));
+ LambdaQueryWrapper<RoleMenu> queryWrapper = new LambdaQueryWrapper<RoleMenu>()
+ .in(RoleMenu::getMenuId, list);
+ baseMapper.delete(queryWrapper);
}
@Override
public List<RoleMenu> getByRoleId(String roleId) {
- return baseMapper.selectList(new LambdaQueryWrapper<RoleMenu>().eq(RoleMenu::getRoleId, roleId));
+ LambdaQueryWrapper<RoleMenu> queryWrapper = new LambdaQueryWrapper<RoleMenu>().eq(RoleMenu::getRoleId, roleId);
+ return baseMapper.selectList(queryWrapper);
}
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/RoleServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/RoleServiceImpl.java
index 7b7ac411f..a3fba3602 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/RoleServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/RoleServiceImpl.java
@@ -96,8 +96,10 @@ public class RoleServiceImpl extends ServiceImpl<RoleMapper, Role> implements Ro
public void updateRole(Role role) {
role.setModifyTime(new Date());
baseMapper.updateById(role);
- roleMenuMapper.delete(
- new LambdaQueryWrapper<RoleMenu>().eq(RoleMenu::getRoleId, role.getRoleId()));
+ LambdaQueryWrapper<RoleMenu> queryWrapper = new LambdaQueryWrapper<RoleMenu>()
+ .eq(RoleMenu::getRoleId, role.getRoleId());
+ roleMenuMapper.delete(queryWrapper);
+
String menuId = role.getMenuId();
if (StringUtils.contains(menuId, Constant.APP_DETAIL_MENU_ID) && !StringUtils.contains(menuId, Constant.APP_MENU_ID)) {
menuId = menuId + StringPool.COMMA + Constant.APP_MENU_ID;
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/TeamServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/TeamServiceImpl.java
index e7e3b5683..59fe76a5c 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/TeamServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/TeamServiceImpl.java
@@ -79,7 +79,9 @@ public class TeamServiceImpl extends ServiceImpl<TeamMapper, Team> implements Te
@Override
public Team findByName(String teamName) {
- return baseMapper.selectOne(new LambdaQueryWrapper<Team>().eq(Team::getTeamName, teamName));
+ LambdaQueryWrapper<Team> queryWrapper = new LambdaQueryWrapper<Team>()
+ .eq(Team::getTeamName, teamName);
+ return baseMapper.selectOne(queryWrapper);
}
@Override
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/UserServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/UserServiceImpl.java
index 185b89996..6399cb64f 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/UserServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/UserServiceImpl.java
@@ -64,7 +64,9 @@ public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements Us
@Override
public User findByName(String username) {
- return baseMapper.selectOne(new LambdaQueryWrapper<User>().eq(User::getUsername, username));
+ LambdaQueryWrapper<User> queryWrapper = new LambdaQueryWrapper<User>()
+ .eq(User::getUsername, username);
+ return baseMapper.selectOne(queryWrapper);
}
@Override
@@ -86,7 +88,9 @@ public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements Us
public void updateLoginTime(String username) {
User user = new User();
user.setLastLoginTime(new Date());
- this.baseMapper.update(user, new LambdaQueryWrapper<User>().eq(User::getUsername, username));
+ LambdaQueryWrapper<User> queryWrapper = new LambdaQueryWrapper<User>()
+ .eq(User::getUsername, username);
+ this.baseMapper.update(user, queryWrapper);
}
@Override
@@ -128,7 +132,9 @@ public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements Us
public void updateAvatar(String username, String avatar) {
User user = new User();
user.setAvatar(avatar);
- this.baseMapper.update(user, new LambdaQueryWrapper<User>().eq(User::getUsername, username));
+ LambdaQueryWrapper<User> queryWrapper = new LambdaQueryWrapper<User>()
+ .eq(User::getUsername, username);
+ this.baseMapper.update(user, queryWrapper);
}
@Override
@@ -139,7 +145,9 @@ public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements Us
password = ShaHashUtils.encrypt(salt, password);
user.setSalt(salt);
user.setPassword(password);
- this.baseMapper.update(user, new LambdaQueryWrapper<User>().eq(User::getUsername, username));
+ LambdaQueryWrapper<User> queryWrapper = new LambdaQueryWrapper<User>()
+ .eq(User::getUsername, username);
+ this.baseMapper.update(user, queryWrapper);
}
@Override
@@ -151,7 +159,9 @@ public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements Us
String password = ShaHashUtils.encrypt(salt, User.DEFAULT_PASSWORD);
user.setSalt(salt);
user.setPassword(password);
- this.baseMapper.update(user, new LambdaQueryWrapper<User>().eq(User::getUsername, username));
+ LambdaQueryWrapper<User> queryWrapper = new LambdaQueryWrapper<User>()
+ .eq(User::getUsername, username);
+ this.baseMapper.update(user, queryWrapper);
}
}
@@ -200,7 +210,7 @@ public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements Us
/**
* generate user info, contains: 1.token, 2.vue router, 3.role, 4.permission, 5.personalized config info of frontend
*
- * @param user user
+ * @param user user
* @return UserInfo
*/
@Override
diff --git a/streampark-console/streampark-console-service/src/main/resources/mapper/core/AlertConfigMapper.xml b/streampark-console/streampark-console-service/src/main/resources/mapper/core/AlertConfigMapper.xml
index 1e91c795d..4eae46e65 100644
--- a/streampark-console/streampark-console-service/src/main/resources/mapper/core/AlertConfigMapper.xml
+++ b/streampark-console/streampark-console-service/src/main/resources/mapper/core/AlertConfigMapper.xml
@@ -33,7 +33,7 @@
<result column="modify_time" jdbcType="TIMESTAMP" property="modifyTime"/>
</resultMap>
- <select id="getAlertConfByName" resultType="alertConfig" parameterType="alertConfig">
+ <select id="getAlertConfByName" resultType="org.apache.streampark.console.core.entity.AlertConfig" parameterType="org.apache.streampark.console.core.entity.AlertConfig">
SELECT id
FROM t_alert_config t
where t.alert_name = #{alertConfig.alertName} limit 1;
diff --git a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationConfigMapper.xml b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationConfigMapper.xml
index cd19eb9c1..e606a3cd4 100644
--- a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationConfigMapper.xml
+++ b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationConfigMapper.xml
@@ -29,4 +29,19 @@
<result column="content" jdbcType="LONGVARCHAR" property="content"/>
<result column="create_time" jdbcType="DATE" property="createTime"/>
</resultMap>
+
+ <select id="getLastVersion" resultType="java.lang.Integer" parameterType="java.lang.Long">
+ select max(`version`) as lastVersion
+ from t_flink_config
+ where app_id = #{appId}
+ </select>
+
+ <select id="getEffective" resultType="org.apache.streampark.console.core.entity.ApplicationConfig" parameterType="java.lang.Long">
+ select s.*
+ from t_flink_config s
+ inner join t_flink_effective e
+ on s.id = e.target_id
+ where e.app_id = #{appId}
+ and e.target_type = 1
+ </select>
</mapper>
diff --git a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
index 9d3cd3c15..117d57bfe 100644
--- a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
+++ b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
@@ -79,7 +79,7 @@
<result column="tags" jdbcType="VARCHAR" property="tags"/>
</resultMap>
- <select id="page" resultType="application" parameterType="application">
+ <select id="page" resultType="org.apache.streampark.console.core.entity.Application" parameterType="org.apache.streampark.console.core.entity.Application">
select t.*,
p.name as projectName,
u.username,
@@ -137,22 +137,14 @@
</where>
</select>
- <select id="getApp" resultType="application" parameterType="application">
+ <select id="getApp" resultType="org.apache.streampark.console.core.entity.Application" parameterType="org.apache.streampark.console.core.entity.Application">
select t.*, p.name as projectName
from t_flink_app t left join t_flink_project p
on t.project_id = p.id
where t.id = #{application.id}
</select>
- <select id="getByTeamId" resultType="application" parameterType="Long">
- select t.*, u.username, case when trim(u.nick_name) = '' then u.username else u.nick_name end as nick_name
- from t_flink_app t
- inner join t_user u
- on t.user_id = u.user_id
- where t.team_id=#{teamId}
- </select>
-
- <update id="updateTracking" parameterType="application">
+ <update id="updateTracking" parameterType="org.apache.streampark.console.core.entity.Application">
update t_flink_app
<set>
<if test="application.jobId != null and application.jobId != ''">
@@ -201,7 +193,15 @@
where id=#{application.id}
</update>
- <update id="mapping" parameterType="application">
+ <select id="getByTeamId" resultType="org.apache.streampark.console.core.entity.Application" parameterType="java.lang.Long">
+ select t.*, u.username, case when trim(u.nick_name) = '' then u.username else u.nick_name end as nick_name
+ from t_flink_app t
+ inner join t_user u
+ on t.user_id = u.user_id
+ where t.team_id=#{teamId}
+ </select>
+
+ <update id="mapping" parameterType="org.apache.streampark.console.core.entity.Application">
update t_flink_app
<set>
<if test="application.jobId != null">
@@ -216,4 +216,96 @@
where id=#{application.id}
</update>
+ <select id="getRecentK8sNamespace" resultType="java.lang.String" parameterType="java.lang.Integer">
+ select k8s_namespace
+ from (
+ select
+ k8s_namespace,
+ max(create_time) as ct
+ from t_flink_app
+ where k8s_namespace is not null
+ group by k8s_namespace
+ order by ct desc
+ ) as ns
+ limit #{limitSize}
+ </select>
+
+ <select id="getRecentK8sClusterId" resultType="java.lang.String" parameterType="java.util.Map">
+ select cluster_id
+ from (
+ select
+ cluster_id,
+ max(create_time) as ct
+ from t_flink_app
+ where cluster_id is not null
+ and execution_mode = #{executionMode}
+ group by cluster_id
+ order by ct desc
+ ) as ci
+ limit #{limitSize}
+ </select>
+
+ <select id="getRecentFlinkBaseImage" resultType="java.lang.String" parameterType="java.lang.Integer">
+ select flink_image
+ from (
+ select
+ flink_image,
+ max(create_time) as ct
+ from t_flink_app
+ where flink_image is not null
+ and execution_mode = 6
+ group by flink_image
+ order by ct desc
+ ) as fi
+ limit #{limitSize}
+ </select>
+
+ <select id="getRecentK8sPodTemplate" resultType="java.lang.String" parameterType="java.lang.Integer">
+ select k8s_pod_template
+ from (
+ select
+ k8s_pod_template,
+ max(create_time) as ct
+ from t_flink_app
+ where k8s_pod_template is not null
+ and k8s_pod_template !=''
+ and execution_mode = 6
+ group by k8s_pod_template
+ order by ct desc
+ ) as pt
+ limit #{limitSize}
+ </select>
+
+ <select id="getRecentK8sJmPodTemplate" resultType="java.lang.String" parameterType="java.lang.Integer">
+ select k8s_jm_pod_template
+ from (
+ select
+ k8s_jm_pod_template,
+ max(create_time) as ct
+ from t_flink_app
+ where k8s_jm_pod_template is not null
+ and k8s_jm_pod_template != ''
+ and execution_mode = 6
+ group by k8s_jm_pod_template
+ order by ct desc
+ ) as pt
+ limit #{limitSize}
+ </select>
+
+ <select id="getRecentK8sTmPodTemplate" resultType="java.lang.String" parameterType="java.lang.Integer">
+ select k8s_tm_pod_template
+ from (
+ select
+ k8s_tm_pod_template,
+ max(create_time) as ct
+ from t_flink_app
+ where k8s_tm_pod_template is not null
+ and k8s_tm_pod_template != ''
+ and execution_mode = 6
+ group by k8s_tm_pod_template
+ order by ct desc
+ ) as pt
+ limit #{limitSize}
+ </select>
+
</mapper>
diff --git a/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkEnvMapper.xml b/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkEnvMapper.xml
index 2173b04f7..0c4c505ba 100644
--- a/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkEnvMapper.xml
+++ b/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkEnvMapper.xml
@@ -30,4 +30,20 @@
<result column="is_default" jdbcType="BOOLEAN" property="isDefault"/>
<result column="create_time" jdbcType="DATE" property="createTime"/>
</resultMap>
+
+ <select id="getByAppId" resultType="org.apache.streampark.console.core.entity.FlinkEnv" parameterType="java.lang.Long">
+ select v.*
+ from t_flink_env v
+ inner join (
+ select version_id
+ from t_flink_app
+ where id = #{appId}
+ ) as t
+ on v.id = t.version_id
+ </select>
+
+ <update id="setDefault" parameterType="java.lang.Long">
+ update t_flink_env
+ set is_default = case id when #{id} then true else false end
+ </update>
</mapper>
diff --git a/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkSqlMapper.xml b/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkSqlMapper.xml
index 5c394a3de..ba658907b 100644
--- a/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkSqlMapper.xml
+++ b/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkSqlMapper.xml
@@ -28,7 +28,22 @@
<result column="create_time" jdbcType="DATE" property="createTime"/>
</resultMap>
- <select id="getByTeamId" resultType="flinkSql" parameterType="Long">
+ <select id="getEffective" resultType="org.apache.streampark.console.core.entity.FlinkSql" parameterType="java.lang.Long">
+ select s.*
+ from t_flink_sql s
+ inner join t_flink_effective e
+ on s.id = e.target_id
+ where e.target_type = 2
+ and e.app_id = #{appId}
+ </select>
+
+ <select id="getLatestVersion" resultType="java.lang.Integer" parameterType="java.lang.Long">
+ select max(`version`) as maxVersion
+ from t_flink_sql
+ where app_id = #{appId}
+ </select>
+
+ <select id="getByTeamId" resultType="org.apache.streampark.console.core.entity.FlinkSql" parameterType="java.lang.Long">
select s.*
from t_flink_sql s, t_flink_app a
where s.app_id = a.id
diff --git a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ProjectMapper.xml b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ProjectMapper.xml
index caf238727..520a0b31d 100644
--- a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ProjectMapper.xml
+++ b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ProjectMapper.xml
@@ -39,17 +39,21 @@
<result column="modify_time" jdbcType="TIMESTAMP" property="modifyTime"/>
</resultMap>
- <select id="page" resultType="project" parameterType="project">
- select * from t_flink_project t
- <where>
- t.team_id = #{project.teamId}
- <if test="project.name != null and project.name != ''">
- and t.name like '%${project.name}%'
- </if>
- <if test="project.buildState != null">
- and t.build_state = #{project.buildState}
- </if>
- </where>
- </select>
+ <update id="updateFailureBuildById" parameterType="org.apache.streampark.console.core.entity.Project">
+ update t_flink_project
+ set BUILD_STATE=2
+ where id=#{project.id}
+ </update>
+ <update id="updateSuccessBuildById" parameterType="org.apache.streampark.console.core.entity.Project">
+ update t_flink_project
+ set LAST_BUILD=now(), BUILD_STATE=1
+ where id=#{project.id}
+ </update>
+
+ <update id="updateStartBuildById" parameterType="org.apache.streampark.console.core.entity.Project">
+ update t_flink_project
+ set BUILD_STATE=0
+ where id=#{project.id}
+ </update>
</mapper>
diff --git a/streampark-console/streampark-console-service/src/main/resources/mapper/core/VariableMapper.xml b/streampark-console/streampark-console-service/src/main/resources/mapper/core/VariableMapper.xml
index c9b09e44f..fa6f640b6 100644
--- a/streampark-console/streampark-console-service/src/main/resources/mapper/core/VariableMapper.xml
+++ b/streampark-console/streampark-console-service/src/main/resources/mapper/core/VariableMapper.xml
@@ -31,7 +31,7 @@
<result column="modify_time" jdbcType="TIMESTAMP" property="modifyTime"/>
</resultMap>
- <select id="page" resultType="variable" parameterType="variable">
+ <select id="page" resultType="org.apache.streampark.console.core.entity.Variable" parameterType="org.apache.streampark.console.core.entity.Variable">
select v.*, u.username as creatorName
from t_variable v, t_user u
<where>
@@ -46,7 +46,7 @@
</where>
</select>
- <select id="selectByTeamId" resultType="variable">
+ <select id="selectByTeamId" resultType="org.apache.streampark.console.core.entity.Variable">
select *
from t_variable
<where>