You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by be...@apache.org on 2022/09/16 15:34:27 UTC
[incubator-streampark] branch dev updated: [Improve] Replace the assert with checkState (#1624)
This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new aa7240cf2 [Improve] Replace the assert with checkState (#1624)
aa7240cf2 is described below
commit aa7240cf2f11a7610a4dcf0c067e346b09005236
Author: 1996fanrui <19...@gmail.com>
AuthorDate: Fri Sep 16 23:34:20 2022 +0800
[Improve] Replace the assert with checkState (#1624)
---
.../console/core/runner/EnvInitializer.java | 2 +-
.../core/service/impl/AppBuildPipeServiceImpl.java | 5 +-
.../service/impl/ApplicationBackUpServiceImpl.java | 3 +-
.../core/service/impl/ApplicationServiceImpl.java | 57 +++++++++++++---------
.../core/service/impl/FlinkSqlServiceImpl.java | 5 +-
.../core/service/impl/ProjectServiceImpl.java | 9 ++--
.../core/service/impl/SavePointServiceImpl.java | 3 +-
.../core/service/impl/SqlCompleteServiceImpl.java | 3 +-
.../console/core/task/FlinkTrackingTask.java | 3 +-
.../system/service/impl/UserServiceImpl.java | 3 +-
10 files changed, 56 insertions(+), 37 deletions(-)
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
index 3b2ca6ad5..f6c4fd6a3 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
@@ -96,7 +96,7 @@ public class EnvInitializer implements ApplicationRunner {
.filter(springEnv::containsProperty)
.forEach(key -> {
InternalOption config = InternalConfigHolder.getConfig(key);
- assert config != null;
+ AssertUtils.state(config != null);
InternalConfigHolder.set(config, springEnv.getProperty(key, config.classType()));
});
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index 1ab418f22..06ee06a22 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -23,6 +23,7 @@ import org.apache.streampark.common.enums.ApplicationType;
import org.apache.streampark.common.enums.DevelopmentMode;
import org.apache.streampark.common.enums.ExecutionMode;
import org.apache.streampark.common.fs.FsOperator;
+import org.apache.streampark.common.util.AssertUtils;
import org.apache.streampark.common.util.ExceptionUtils;
import org.apache.streampark.common.util.FileUtils;
import org.apache.streampark.common.util.ThreadUtils;
@@ -150,7 +151,7 @@ public class AppBuildPipeServiceImpl
FlinkSql effectiveFlinkSql = flinkSqlService.getEffective(app.getId(), false);
if (app.isFlinkSqlJob()) {
FlinkSql flinkSql = newFlinkSql == null ? effectiveFlinkSql : newFlinkSql;
- assert flinkSql != null;
+ AssertUtils.state(flinkSql != null);
app.setDependency(flinkSql.getDependency());
}
@@ -207,7 +208,7 @@ public class AppBuildPipeServiceImpl
//copy jar to local upload dir
for (String jar : app.getDependencyObject().getJar()) {
File localJar = new File(WebUtils.getAppTempDir(), jar);
- assert localJar.exists();
+ AssertUtils.state(localJar.exists());
String localUploads = Workspace.local().APP_UPLOADS();
String uploadJar = localUploads.concat("/").concat(jar);
checkOrElseUploadJar(FsOperator.lfs(), localJar, uploadJar, localUploads);
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationBackUpServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationBackUpServiceImpl.java
index 86f49b3a1..5e0e60775 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationBackUpServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationBackUpServiceImpl.java
@@ -18,6 +18,7 @@
package org.apache.streampark.console.core.service.impl;
import org.apache.streampark.common.fs.FsOperator;
+import org.apache.streampark.common.util.AssertUtils;
import org.apache.streampark.common.util.ThreadUtils;
import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.base.exception.InternalException;
@@ -192,7 +193,7 @@ public class ApplicationBackUpServiceImpl
@Override
public void rollbackFlinkSql(Application application, FlinkSql sql) {
ApplicationBackUp backUp = getFlinkSqlBackup(application.getId(), sql.getId());
- assert backUp != null;
+ AssertUtils.state(backUp != null);
try {
FlinkTrackingTask.refreshTracking(backUp.getAppId(), () -> {
// rollback config and sql
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 980c9a0d8..b4ebbf656 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -29,6 +29,7 @@ import org.apache.streampark.common.enums.ExecutionMode;
import org.apache.streampark.common.enums.ResolveOrder;
import org.apache.streampark.common.enums.StorageType;
import org.apache.streampark.common.fs.HdfsOperator;
+import org.apache.streampark.common.util.AssertUtils;
import org.apache.streampark.common.util.CompletableFutureUtils;
import org.apache.streampark.common.util.DeflaterUtils;
import org.apache.streampark.common.util.ExceptionUtils;
@@ -205,9 +206,9 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
this.baseMapper.resetOptionState();
}
- private volatile Map<Long, CompletableFuture> startFutureMap = new ConcurrentHashMap<>();
+ private final Map<Long, CompletableFuture<SubmitResponse>> startFutureMap = new ConcurrentHashMap<>();
- private volatile Map<Long, CompletableFuture> cancelFutureMap = new ConcurrentHashMap<>();
+ private final Map<Long, CompletableFuture<CancelResponse>> cancelFutureMap = new ConcurrentHashMap<>();
@Override
public Map<String, Serializable> dashboard() {
@@ -324,7 +325,9 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
@Override
public void revoke(Application appParma) throws ApplicationException {
Application application = getById(appParma.getId());
- assert application != null;
+ AssertUtils.state(application != null,
+ String.format("The application id=%s cannot be find in the database.",
+ appParma.getId()));
//1) delete files that have been published to workspace
application.getFsOperator().delete(application.getAppHome());
@@ -434,10 +437,7 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
return false;
}
Long useId = FlinkTrackingTask.getCanceledJobUserId(appId);
- if (useId == null || application.getUserId().longValue() != FlinkTrackingTask.getCanceledJobUserId(appId).longValue()) {
- return true;
- }
- return false;
+ return useId == null || application.getUserId().longValue() != FlinkTrackingTask.getCanceledJobUserId(appId).longValue();
}
private void removeApp(Application application) {
@@ -589,7 +589,7 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
if (appParam.getConfig() != null) {
configService.create(appParam, true);
}
- assert appParam.getId() != null;
+ AssertUtils.state(appParam.getId() != null);
return true;
}
return false;
@@ -675,7 +675,7 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
configService.save(config);
configService.setLatestOrEffective(true, config.getId(), newApp.getId());
}
- assert newApp.getId() != null;
+ AssertUtils.state(newApp.getId() != null);
return newApp.getId();
}
return 0L;
@@ -794,7 +794,7 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
} else {
// get previous flink sql and decode
FlinkSql copySourceFlinkSql = flinkSqlService.getById(appParam.getSqlId());
- assert copySourceFlinkSql != null;
+ AssertUtils.state(copySourceFlinkSql != null);
copySourceFlinkSql.decode();
// get submit flink sql
@@ -895,8 +895,8 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
@Override
public void forcedStop(Application app) {
- CompletableFuture startFuture = startFutureMap.remove(app.getId());
- CompletableFuture cancelFuture = cancelFutureMap.remove(app.getId());
+ CompletableFuture<SubmitResponse> startFuture = startFutureMap.remove(app.getId());
+ CompletableFuture<CancelResponse> cancelFuture = cancelFutureMap.remove(app.getId());
if (startFuture != null) {
startFuture.cancel(true);
}
@@ -1036,7 +1036,10 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
if (ExecutionMode.isRemoteMode(application.getExecutionModeEnum())) {
FlinkCluster cluster = flinkClusterService.getById(application.getFlinkClusterId());
- assert cluster != null;
+ AssertUtils.state(cluster != null,
+ String.format("The clusterId=%s cannot be find, maybe the clusterId is wrong or " +
+ "the cluster has been deleted. Please contact the Admin.",
+ application.getFlinkClusterId()));
URI activeAddress = cluster.getActiveAddress();
extraParameter.put(RestOptions.ADDRESS.key(), activeAddress.getHost());
extraParameter.put(RestOptions.PORT.key(), activeAddress.getPort());
@@ -1049,7 +1052,9 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
}
if (ExecutionMode.YARN_SESSION.equals(application.getExecutionModeEnum())) {
FlinkCluster cluster = flinkClusterService.getById(application.getFlinkClusterId());
- assert cluster != null;
+ AssertUtils.state(cluster != null,
+ String.format("The yarn session clusterId=%s cannot be find, maybe the clusterId is wrong or " +
+ "the cluster has been deleted. Please contact the Admin.", application.getFlinkClusterId()));
extraParameter.put(ConfigConst.KEY_YARN_APP_ID(), cluster.getClusterId());
}
}
@@ -1172,7 +1177,7 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
@RefreshCache
public void starting(Application appParam) {
Application application = getById(appParam.getId());
- assert application != null;
+ AssertUtils.state(application != null);
application.setState(FlinkAppState.STARTING.getValue());
application.setOptionTime(new Date());
updateById(application);
@@ -1185,7 +1190,7 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
final Application application = getById(appParam.getId());
- assert application != null;
+ AssertUtils.state(application != null);
application.setJobId(new JobID().toHexString());
// if manually started, clear the restart flag
@@ -1210,7 +1215,7 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
ApplicationConfig applicationConfig = configService.getEffective(application.getId());
ExecutionMode executionMode = ExecutionMode.of(application.getExecutionMode());
- assert executionMode != null;
+ AssertUtils.state(executionMode != null);
if (application.isCustomCodeJob()) {
if (application.isUploadJob()) {
appConf = String.format("json://{\"%s\":\"%s\"}",
@@ -1245,7 +1250,7 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
}
} else if (application.isFlinkSqlJob()) {
FlinkSql flinkSql = flinkSqlService.getEffective(application.getId(), false);
- assert flinkSql != null;
+ AssertUtils.state(flinkSql != null);
// 1) dist_userJar
FlinkEnv flinkEnv = flinkEnvService.getByIdOrDefault(application.getVersionId());
String sqlDistJar = commonService.getSqlClientJar(flinkEnv);
@@ -1272,7 +1277,9 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
if (ExecutionMode.isRemoteMode(application.getExecutionModeEnum())) {
FlinkCluster cluster = flinkClusterService.getById(application.getFlinkClusterId());
- assert cluster != null;
+ AssertUtils.state(cluster != null,
+ String.format("The clusterId=%s cannot be find, maybe the clusterId is wrong or " +
+ "the cluster has been deleted. Please contact the Admin.", application.getFlinkClusterId()));
URI activeAddress = cluster.getActiveAddress();
extraParameter.put(RestOptions.ADDRESS.key(), activeAddress.getHost());
extraParameter.put(RestOptions.PORT.key(), activeAddress.getPort());
@@ -1285,7 +1292,9 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
}
if (ExecutionMode.YARN_SESSION.equals(application.getExecutionModeEnum())) {
FlinkCluster cluster = flinkClusterService.getById(application.getFlinkClusterId());
- assert cluster != null;
+ AssertUtils.state(cluster != null,
+ String.format("The yarn session clusterId=%s cannot be find, maybe the clusterId is wrong or " +
+ "the cluster has been deleted. Please contact the Admin.", application.getFlinkClusterId()));
extraParameter.put(ConfigConst.KEY_YARN_APP_ID(), cluster.getClusterId());
}
}
@@ -1309,14 +1318,14 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
AppBuildPipeline buildPipeline = appBuildPipeService.getById(application.getId());
- assert buildPipeline != null;
+ AssertUtils.state(buildPipeline != null);
BuildResult buildResult = buildPipeline.getBuildResult();
if (executionMode.equals(ExecutionMode.YARN_APPLICATION)) {
buildResult = new ShadedBuildResponse(null, flinkUserJar, true);
} else {
if (ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
- assert buildResult != null;
+ AssertUtils.state(buildResult != null);
DockerImageBuildResponse result = buildResult.as(DockerImageBuildResponse.class);
String ingressTemplates = application.getIngressTemplate();
String domainName = application.getDefaultModeIngress();
@@ -1484,7 +1493,9 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
// 3.1) At the remote mode, request the flink webui interface to get the savepoint path
if (ExecutionMode.isRemoteMode(application.getExecutionMode())) {
FlinkCluster cluster = flinkClusterService.getById(application.getFlinkClusterId());
- assert cluster != null;
+ AssertUtils.state(cluster != null,
+ String.format("The clusterId=%s cannot be find, maybe the clusterId is wrong or " +
+ "the cluster has been deleted. Please contact the Admin.", application.getFlinkClusterId()));
Map<String, String> config = cluster.getFlinkConfig();
if (!config.isEmpty()) {
savepointPath = config.get(ConfigConst.KEY_FLINK_STATE_SAVEPOINTS_DIR().substring(6));
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java
index 8c13e27fe..5526cf54d 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java
@@ -17,6 +17,7 @@
package org.apache.streampark.console.core.service.impl;
+import org.apache.streampark.common.util.AssertUtils;
import org.apache.streampark.common.util.DeflaterUtils;
import org.apache.streampark.common.util.ExceptionUtils;
import org.apache.streampark.console.core.entity.Application;
@@ -168,11 +169,11 @@ public class FlinkSqlServiceImpl extends ServiceImpl<FlinkSqlMapper, FlinkSql> i
@Transactional(propagation = Propagation.REQUIRES_NEW, rollbackFor = Exception.class)
public void rollback(Application application) {
FlinkSql sql = getCandidate(application.getId(), CandidateType.HISTORY);
- assert sql != null;
+ AssertUtils.state(sql != null);
try {
// check and backup current job
FlinkSql effectiveSql = getEffective(application.getId(), false);
- assert effectiveSql != null;
+ AssertUtils.state(effectiveSql != null);
// rollback history sql
backUpService.rollbackFlinkSql(application, sql);
} catch (Exception e) {
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
index 5dcce2041..a03e134c7 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
@@ -17,6 +17,7 @@
package org.apache.streampark.console.core.service.impl;
+import org.apache.streampark.common.util.AssertUtils;
import org.apache.streampark.common.util.CommandUtils;
import org.apache.streampark.common.util.ThreadUtils;
import org.apache.streampark.common.util.Utils;
@@ -71,7 +72,7 @@ import java.util.concurrent.TimeUnit;
public class ProjectServiceImpl extends ServiceImpl<ProjectMapper, Project>
implements ProjectService {
- private volatile Map<Long, Byte> tailOutMap = new ConcurrentHashMap<>();
+ private final Map<Long, Byte> tailOutMap = new ConcurrentHashMap<>();
private final Map<Long, StringBuilder> tailBuffer = new ConcurrentHashMap<>();
@@ -114,7 +115,7 @@ public class ProjectServiceImpl extends ServiceImpl<ProjectMapper, Project>
public boolean update(Project projectParam) {
try {
Project project = getById(projectParam.getId());
- assert project != null;
+ AssertUtils.state(project != null);
project.setName(projectParam.getName());
project.setUrl(projectParam.getUrl());
project.setBranches(projectParam.getBranches());
@@ -147,7 +148,7 @@ public class ProjectServiceImpl extends ServiceImpl<ProjectMapper, Project>
@Transactional(rollbackFor = {Exception.class})
public boolean delete(Long id) {
Project project = getById(id);
- assert project != null;
+ AssertUtils.state(project != null);
LambdaQueryWrapper<Application> queryWrapper = new LambdaQueryWrapper<Application>();
queryWrapper.eq(Application::getProjectId, id);
long count = applicationService.count(queryWrapper);
@@ -349,7 +350,7 @@ public class ProjectServiceImpl extends ServiceImpl<ProjectMapper, Project>
}
List<Map<String, Object>> list = new ArrayList<>();
File[] files = unzipFile.listFiles(x -> "conf".equals(x.getName()));
- assert files != null;
+ AssertUtils.state(files != null);
for (File item : files) {
eachFile(item, list, true);
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
index e3138fce3..7000a00fd 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
@@ -17,6 +17,7 @@
package org.apache.streampark.console.core.service.impl;
+import org.apache.streampark.common.util.AssertUtils;
import org.apache.streampark.console.base.domain.Constant;
import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.base.exception.InternalException;
@@ -62,7 +63,7 @@ public class SavePointServiceImpl extends ServiceImpl<SavePointMapper, SavePoint
private void expire(SavePoint entity) {
FlinkEnv flinkEnv = flinkEnvService.getByAppId(entity.getAppId());
- assert flinkEnv != null;
+ AssertUtils.state(flinkEnv != null);
int cpThreshold = Integer.parseInt(
flinkEnv.convertFlinkYamlAsMap()
.getOrDefault("state.checkpoints.num-retained", "5")
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlCompleteServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlCompleteServiceImpl.java
index 94e3e99bd..04072869c 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlCompleteServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlCompleteServiceImpl.java
@@ -17,6 +17,7 @@
package org.apache.streampark.console.core.service.impl;
+import org.apache.streampark.common.util.AssertUtils;
import org.apache.streampark.console.core.service.SqlCompleteService;
import com.google.common.collect.Sets;
@@ -183,7 +184,7 @@ public class SqlCompleteServiceImpl implements SqlCompleteService {
nowStep = nowStep.get(nowChar).getNext();
loc += 1;
}
- assert preNode != null;
+ AssertUtils.state(preNode != null);
preNode.setStop();
preNode.setCount(count);
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkTrackingTask.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkTrackingTask.java
index 95fa66a51..4c7f1ab79 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkTrackingTask.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkTrackingTask.java
@@ -20,6 +20,7 @@ package org.apache.streampark.console.core.task;
import static org.apache.streampark.common.enums.ExecutionMode.isKubernetesMode;
import org.apache.streampark.common.enums.ExecutionMode;
+import org.apache.streampark.common.util.AssertUtils;
import org.apache.streampark.common.util.HttpClientUtils;
import org.apache.streampark.common.util.ThreadUtils;
import org.apache.streampark.common.util.YarnUtils;
@@ -207,7 +208,7 @@ public class FlinkTrackingTask {
final OptionState optionState = OPTIONING.get(key);
try {
// query status from flink rest api
- assert application.getId() != null;
+ AssertUtils.state(application.getId() != null);
getFromFlinkRestApi(application, stopFrom);
} catch (Exception flinkException) {
// query status from yarn rest api
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/UserServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/UserServiceImpl.java
index ab1bdc9c5..564137c59 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/UserServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/UserServiceImpl.java
@@ -17,6 +17,7 @@
package org.apache.streampark.console.system.service.impl;
+import org.apache.streampark.common.util.AssertUtils;
import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.base.util.ShaHashUtils;
import org.apache.streampark.console.system.entity.Menu;
@@ -84,7 +85,7 @@ public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements Us
});
resPage.setRecords(users);
}
- assert resPage != null;
+ AssertUtils.state(resPage != null);
if (resPage.getTotal() == 0) {
resPage.setRecords(Collections.emptyList());
}