You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by be...@apache.org on 2022/09/16 15:34:27 UTC

[incubator-streampark] branch dev updated: [Improve] Replace the assert with checkState (#1624)

This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new aa7240cf2 [Improve] Replace the assert with checkState (#1624)
aa7240cf2 is described below

commit aa7240cf2f11a7610a4dcf0c067e346b09005236
Author: 1996fanrui <19...@gmail.com>
AuthorDate: Fri Sep 16 23:34:20 2022 +0800

    [Improve] Replace the assert with checkState (#1624)
---
 .../console/core/runner/EnvInitializer.java        |  2 +-
 .../core/service/impl/AppBuildPipeServiceImpl.java |  5 +-
 .../service/impl/ApplicationBackUpServiceImpl.java |  3 +-
 .../core/service/impl/ApplicationServiceImpl.java  | 57 +++++++++++++---------
 .../core/service/impl/FlinkSqlServiceImpl.java     |  5 +-
 .../core/service/impl/ProjectServiceImpl.java      |  9 ++--
 .../core/service/impl/SavePointServiceImpl.java    |  3 +-
 .../core/service/impl/SqlCompleteServiceImpl.java  |  3 +-
 .../console/core/task/FlinkTrackingTask.java       |  3 +-
 .../system/service/impl/UserServiceImpl.java       |  3 +-
 10 files changed, 56 insertions(+), 37 deletions(-)

diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
index 3b2ca6ad5..f6c4fd6a3 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
@@ -96,7 +96,7 @@ public class EnvInitializer implements ApplicationRunner {
             .filter(springEnv::containsProperty)
             .forEach(key -> {
                 InternalOption config = InternalConfigHolder.getConfig(key);
-                assert config != null;
+                AssertUtils.state(config != null);
                 InternalConfigHolder.set(config, springEnv.getProperty(key, config.classType()));
             });
 
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index 1ab418f22..06ee06a22 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -23,6 +23,7 @@ import org.apache.streampark.common.enums.ApplicationType;
 import org.apache.streampark.common.enums.DevelopmentMode;
 import org.apache.streampark.common.enums.ExecutionMode;
 import org.apache.streampark.common.fs.FsOperator;
+import org.apache.streampark.common.util.AssertUtils;
 import org.apache.streampark.common.util.ExceptionUtils;
 import org.apache.streampark.common.util.FileUtils;
 import org.apache.streampark.common.util.ThreadUtils;
@@ -150,7 +151,7 @@ public class AppBuildPipeServiceImpl
         FlinkSql effectiveFlinkSql = flinkSqlService.getEffective(app.getId(), false);
         if (app.isFlinkSqlJob()) {
             FlinkSql flinkSql = newFlinkSql == null ? effectiveFlinkSql : newFlinkSql;
-            assert flinkSql != null;
+            AssertUtils.state(flinkSql != null);
             app.setDependency(flinkSql.getDependency());
         }
 
@@ -207,7 +208,7 @@ public class AppBuildPipeServiceImpl
                         //copy jar to local upload dir
                         for (String jar : app.getDependencyObject().getJar()) {
                             File localJar = new File(WebUtils.getAppTempDir(), jar);
-                            assert localJar.exists();
+                            AssertUtils.state(localJar.exists());
                             String localUploads = Workspace.local().APP_UPLOADS();
                             String uploadJar = localUploads.concat("/").concat(jar);
                             checkOrElseUploadJar(FsOperator.lfs(), localJar, uploadJar, localUploads);
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationBackUpServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationBackUpServiceImpl.java
index 86f49b3a1..5e0e60775 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationBackUpServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationBackUpServiceImpl.java
@@ -18,6 +18,7 @@
 package org.apache.streampark.console.core.service.impl;
 
 import org.apache.streampark.common.fs.FsOperator;
+import org.apache.streampark.common.util.AssertUtils;
 import org.apache.streampark.common.util.ThreadUtils;
 import org.apache.streampark.console.base.domain.RestRequest;
 import org.apache.streampark.console.base.exception.InternalException;
@@ -192,7 +193,7 @@ public class ApplicationBackUpServiceImpl
     @Override
     public void rollbackFlinkSql(Application application, FlinkSql sql) {
         ApplicationBackUp backUp = getFlinkSqlBackup(application.getId(), sql.getId());
-        assert backUp != null;
+        AssertUtils.state(backUp != null);
         try {
             FlinkTrackingTask.refreshTracking(backUp.getAppId(), () -> {
                 // rollback config and sql
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 980c9a0d8..b4ebbf656 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -29,6 +29,7 @@ import org.apache.streampark.common.enums.ExecutionMode;
 import org.apache.streampark.common.enums.ResolveOrder;
 import org.apache.streampark.common.enums.StorageType;
 import org.apache.streampark.common.fs.HdfsOperator;
+import org.apache.streampark.common.util.AssertUtils;
 import org.apache.streampark.common.util.CompletableFutureUtils;
 import org.apache.streampark.common.util.DeflaterUtils;
 import org.apache.streampark.common.util.ExceptionUtils;
@@ -205,9 +206,9 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
         this.baseMapper.resetOptionState();
     }
 
-    private volatile Map<Long, CompletableFuture> startFutureMap = new ConcurrentHashMap<>();
+    private final Map<Long, CompletableFuture<SubmitResponse>> startFutureMap = new ConcurrentHashMap<>();
 
-    private volatile Map<Long, CompletableFuture> cancelFutureMap = new ConcurrentHashMap<>();
+    private final Map<Long, CompletableFuture<CancelResponse>> cancelFutureMap = new ConcurrentHashMap<>();
 
     @Override
     public Map<String, Serializable> dashboard() {
@@ -324,7 +325,9 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
     @Override
     public void revoke(Application appParma) throws ApplicationException {
         Application application = getById(appParma.getId());
-        assert application != null;
+        AssertUtils.state(application != null,
+            String.format("The application id=%s cannot be find in the database.",
+                appParma.getId()));
 
         //1) delete files that have been published to workspace
         application.getFsOperator().delete(application.getAppHome());
@@ -434,10 +437,7 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
             return false;
         }
         Long useId = FlinkTrackingTask.getCanceledJobUserId(appId);
-        if (useId == null || application.getUserId().longValue() != FlinkTrackingTask.getCanceledJobUserId(appId).longValue()) {
-            return true;
-        }
-        return false;
+        return useId == null || application.getUserId().longValue() != FlinkTrackingTask.getCanceledJobUserId(appId).longValue();
     }
 
     private void removeApp(Application application) {
@@ -589,7 +589,7 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
             if (appParam.getConfig() != null) {
                 configService.create(appParam, true);
             }
-            assert appParam.getId() != null;
+            AssertUtils.state(appParam.getId() != null);
             return true;
         }
         return false;
@@ -675,7 +675,7 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
                 configService.save(config);
                 configService.setLatestOrEffective(true, config.getId(), newApp.getId());
             }
-            assert newApp.getId() != null;
+            AssertUtils.state(newApp.getId() != null);
             return newApp.getId();
         }
         return 0L;
@@ -794,7 +794,7 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
         } else {
             // get previous flink sql and decode
             FlinkSql copySourceFlinkSql = flinkSqlService.getById(appParam.getSqlId());
-            assert copySourceFlinkSql != null;
+            AssertUtils.state(copySourceFlinkSql != null);
             copySourceFlinkSql.decode();
 
             // get submit flink sql
@@ -895,8 +895,8 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
 
     @Override
     public void forcedStop(Application app) {
-        CompletableFuture startFuture = startFutureMap.remove(app.getId());
-        CompletableFuture cancelFuture = cancelFutureMap.remove(app.getId());
+        CompletableFuture<SubmitResponse> startFuture = startFutureMap.remove(app.getId());
+        CompletableFuture<CancelResponse> cancelFuture = cancelFutureMap.remove(app.getId());
         if (startFuture != null) {
             startFuture.cancel(true);
         }
@@ -1036,7 +1036,10 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
 
         if (ExecutionMode.isRemoteMode(application.getExecutionModeEnum())) {
             FlinkCluster cluster = flinkClusterService.getById(application.getFlinkClusterId());
-            assert cluster != null;
+            AssertUtils.state(cluster != null,
+                String.format("The clusterId=%s cannot be find, maybe the clusterId is wrong or " +
+                        "the cluster has been deleted. Please contact the Admin.",
+                    application.getFlinkClusterId()));
             URI activeAddress = cluster.getActiveAddress();
             extraParameter.put(RestOptions.ADDRESS.key(), activeAddress.getHost());
             extraParameter.put(RestOptions.PORT.key(), activeAddress.getPort());
@@ -1049,7 +1052,9 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
             }
             if (ExecutionMode.YARN_SESSION.equals(application.getExecutionModeEnum())) {
                 FlinkCluster cluster = flinkClusterService.getById(application.getFlinkClusterId());
-                assert cluster != null;
+                AssertUtils.state(cluster != null,
+                    String.format("The yarn session clusterId=%s cannot be find, maybe the clusterId is wrong or " +
+                        "the cluster has been deleted. Please contact the Admin.", application.getFlinkClusterId()));
                 extraParameter.put(ConfigConst.KEY_YARN_APP_ID(), cluster.getClusterId());
             }
         }
@@ -1172,7 +1177,7 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
     @RefreshCache
     public void starting(Application appParam) {
         Application application = getById(appParam.getId());
-        assert application != null;
+        AssertUtils.state(application != null);
         application.setState(FlinkAppState.STARTING.getValue());
         application.setOptionTime(new Date());
         updateById(application);
@@ -1185,7 +1190,7 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
 
         final Application application = getById(appParam.getId());
 
-        assert application != null;
+        AssertUtils.state(application != null);
 
         application.setJobId(new JobID().toHexString());
         // if manually started, clear the restart flag
@@ -1210,7 +1215,7 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
 
         ApplicationConfig applicationConfig = configService.getEffective(application.getId());
         ExecutionMode executionMode = ExecutionMode.of(application.getExecutionMode());
-        assert executionMode != null;
+        AssertUtils.state(executionMode != null);
         if (application.isCustomCodeJob()) {
             if (application.isUploadJob()) {
                 appConf = String.format("json://{\"%s\":\"%s\"}",
@@ -1245,7 +1250,7 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
             }
         } else if (application.isFlinkSqlJob()) {
             FlinkSql flinkSql = flinkSqlService.getEffective(application.getId(), false);
-            assert flinkSql != null;
+            AssertUtils.state(flinkSql != null);
             // 1) dist_userJar
             FlinkEnv flinkEnv = flinkEnvService.getByIdOrDefault(application.getVersionId());
             String sqlDistJar = commonService.getSqlClientJar(flinkEnv);
@@ -1272,7 +1277,9 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
 
         if (ExecutionMode.isRemoteMode(application.getExecutionModeEnum())) {
             FlinkCluster cluster = flinkClusterService.getById(application.getFlinkClusterId());
-            assert cluster != null;
+            AssertUtils.state(cluster != null,
+                String.format("The clusterId=%s cannot be find, maybe the clusterId is wrong or " +
+                    "the cluster has been deleted. Please contact the Admin.", application.getFlinkClusterId()));
             URI activeAddress = cluster.getActiveAddress();
             extraParameter.put(RestOptions.ADDRESS.key(), activeAddress.getHost());
             extraParameter.put(RestOptions.PORT.key(), activeAddress.getPort());
@@ -1285,7 +1292,9 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
             }
             if (ExecutionMode.YARN_SESSION.equals(application.getExecutionModeEnum())) {
                 FlinkCluster cluster = flinkClusterService.getById(application.getFlinkClusterId());
-                assert cluster != null;
+                AssertUtils.state(cluster != null,
+                    String.format("The yarn session clusterId=%s cannot be find, maybe the clusterId is wrong or " +
+                        "the cluster has been deleted. Please contact the Admin.", application.getFlinkClusterId()));
                 extraParameter.put(ConfigConst.KEY_YARN_APP_ID(), cluster.getClusterId());
             }
         }
@@ -1309,14 +1318,14 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
 
         AppBuildPipeline buildPipeline = appBuildPipeService.getById(application.getId());
 
-        assert buildPipeline != null;
+        AssertUtils.state(buildPipeline != null);
 
         BuildResult buildResult = buildPipeline.getBuildResult();
         if (executionMode.equals(ExecutionMode.YARN_APPLICATION)) {
             buildResult = new ShadedBuildResponse(null, flinkUserJar, true);
         } else {
             if (ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
-                assert buildResult != null;
+                AssertUtils.state(buildResult != null);
                 DockerImageBuildResponse result = buildResult.as(DockerImageBuildResponse.class);
                 String ingressTemplates = application.getIngressTemplate();
                 String domainName = application.getDefaultModeIngress();
@@ -1484,7 +1493,9 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
             // 3.1) At the remote mode, request the flink webui interface to get the savepoint path
             if (ExecutionMode.isRemoteMode(application.getExecutionMode())) {
                 FlinkCluster cluster = flinkClusterService.getById(application.getFlinkClusterId());
-                assert cluster != null;
+                AssertUtils.state(cluster != null,
+                    String.format("The clusterId=%s cannot be find, maybe the clusterId is wrong or " +
+                        "the cluster has been deleted. Please contact the Admin.", application.getFlinkClusterId()));
                 Map<String, String> config = cluster.getFlinkConfig();
                 if (!config.isEmpty()) {
                     savepointPath = config.get(ConfigConst.KEY_FLINK_STATE_SAVEPOINTS_DIR().substring(6));
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java
index 8c13e27fe..5526cf54d 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.streampark.console.core.service.impl;
 
+import org.apache.streampark.common.util.AssertUtils;
 import org.apache.streampark.common.util.DeflaterUtils;
 import org.apache.streampark.common.util.ExceptionUtils;
 import org.apache.streampark.console.core.entity.Application;
@@ -168,11 +169,11 @@ public class FlinkSqlServiceImpl extends ServiceImpl<FlinkSqlMapper, FlinkSql> i
     @Transactional(propagation = Propagation.REQUIRES_NEW, rollbackFor = Exception.class)
     public void rollback(Application application) {
         FlinkSql sql = getCandidate(application.getId(), CandidateType.HISTORY);
-        assert sql != null;
+        AssertUtils.state(sql != null);
         try {
             // check and backup current job
             FlinkSql effectiveSql = getEffective(application.getId(), false);
-            assert effectiveSql != null;
+            AssertUtils.state(effectiveSql != null);
             // rollback history sql
             backUpService.rollbackFlinkSql(application, sql);
         } catch (Exception e) {
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
index 5dcce2041..a03e134c7 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.streampark.console.core.service.impl;
 
+import org.apache.streampark.common.util.AssertUtils;
 import org.apache.streampark.common.util.CommandUtils;
 import org.apache.streampark.common.util.ThreadUtils;
 import org.apache.streampark.common.util.Utils;
@@ -71,7 +72,7 @@ import java.util.concurrent.TimeUnit;
 public class ProjectServiceImpl extends ServiceImpl<ProjectMapper, Project>
     implements ProjectService {
 
-    private volatile Map<Long, Byte> tailOutMap = new ConcurrentHashMap<>();
+    private final Map<Long, Byte> tailOutMap = new ConcurrentHashMap<>();
 
     private final Map<Long, StringBuilder> tailBuffer = new ConcurrentHashMap<>();
 
@@ -114,7 +115,7 @@ public class ProjectServiceImpl extends ServiceImpl<ProjectMapper, Project>
     public boolean update(Project projectParam) {
         try {
             Project project = getById(projectParam.getId());
-            assert project != null;
+            AssertUtils.state(project != null);
             project.setName(projectParam.getName());
             project.setUrl(projectParam.getUrl());
             project.setBranches(projectParam.getBranches());
@@ -147,7 +148,7 @@ public class ProjectServiceImpl extends ServiceImpl<ProjectMapper, Project>
     @Transactional(rollbackFor = {Exception.class})
     public boolean delete(Long id) {
         Project project = getById(id);
-        assert project != null;
+        AssertUtils.state(project != null);
         LambdaQueryWrapper<Application> queryWrapper = new LambdaQueryWrapper<Application>();
         queryWrapper.eq(Application::getProjectId, id);
         long count = applicationService.count(queryWrapper);
@@ -349,7 +350,7 @@ public class ProjectServiceImpl extends ServiceImpl<ProjectMapper, Project>
             }
             List<Map<String, Object>> list = new ArrayList<>();
             File[] files = unzipFile.listFiles(x -> "conf".equals(x.getName()));
-            assert files != null;
+            AssertUtils.state(files != null);
             for (File item : files) {
                 eachFile(item, list, true);
             }
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
index e3138fce3..7000a00fd 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.streampark.console.core.service.impl;
 
+import org.apache.streampark.common.util.AssertUtils;
 import org.apache.streampark.console.base.domain.Constant;
 import org.apache.streampark.console.base.domain.RestRequest;
 import org.apache.streampark.console.base.exception.InternalException;
@@ -62,7 +63,7 @@ public class SavePointServiceImpl extends ServiceImpl<SavePointMapper, SavePoint
 
     private void expire(SavePoint entity) {
         FlinkEnv flinkEnv = flinkEnvService.getByAppId(entity.getAppId());
-        assert flinkEnv != null;
+        AssertUtils.state(flinkEnv != null);
         int cpThreshold = Integer.parseInt(
             flinkEnv.convertFlinkYamlAsMap()
                 .getOrDefault("state.checkpoints.num-retained", "5")
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlCompleteServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlCompleteServiceImpl.java
index 94e3e99bd..04072869c 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlCompleteServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlCompleteServiceImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.streampark.console.core.service.impl;
 
+import org.apache.streampark.common.util.AssertUtils;
 import org.apache.streampark.console.core.service.SqlCompleteService;
 
 import com.google.common.collect.Sets;
@@ -183,7 +184,7 @@ public class SqlCompleteServiceImpl implements SqlCompleteService {
                 nowStep = nowStep.get(nowChar).getNext();
                 loc += 1;
             }
-            assert preNode != null;
+            AssertUtils.state(preNode != null);
             preNode.setStop();
             preNode.setCount(count);
         }
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkTrackingTask.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkTrackingTask.java
index 95fa66a51..4c7f1ab79 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkTrackingTask.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkTrackingTask.java
@@ -20,6 +20,7 @@ package org.apache.streampark.console.core.task;
 import static org.apache.streampark.common.enums.ExecutionMode.isKubernetesMode;
 
 import org.apache.streampark.common.enums.ExecutionMode;
+import org.apache.streampark.common.util.AssertUtils;
 import org.apache.streampark.common.util.HttpClientUtils;
 import org.apache.streampark.common.util.ThreadUtils;
 import org.apache.streampark.common.util.YarnUtils;
@@ -207,7 +208,7 @@ public class FlinkTrackingTask {
                     final OptionState optionState = OPTIONING.get(key);
                     try {
                         // query status from flink rest api
-                        assert application.getId() != null;
+                        AssertUtils.state(application.getId() != null);
                         getFromFlinkRestApi(application, stopFrom);
                     } catch (Exception flinkException) {
                         // query status from yarn rest api
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/UserServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/UserServiceImpl.java
index ab1bdc9c5..564137c59 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/UserServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/UserServiceImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.streampark.console.system.service.impl;
 
+import org.apache.streampark.common.util.AssertUtils;
 import org.apache.streampark.console.base.domain.RestRequest;
 import org.apache.streampark.console.base.util.ShaHashUtils;
 import org.apache.streampark.console.system.entity.Menu;
@@ -84,7 +85,7 @@ public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements Us
             });
             resPage.setRecords(users);
         }
-        assert resPage != null;
+        AssertUtils.state(resPage != null);
         if (resPage.getTotal() == 0) {
             resPage.setRecords(Collections.emptyList());
         }