You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by ca...@apache.org on 2022/10/18 02:29:39 UTC

[incubator-linkis] branch dev-1.3.1-errorcode updated: [flink ]module errorcode optimization and documentation (#3588)

This is an automated email from the ASF dual-hosted git repository.

casion pushed a commit to branch dev-1.3.1-errorcode
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git


The following commit(s) were added to refs/heads/dev-1.3.1-errorcode by this push:
     new 9f6d28fd1 [flink ]module errorcode optimization and documentation (#3588)
9f6d28fd1 is described below

commit 9f6d28fd11e4668a9c8ee42538740ef6cfbad89a
Author: 成彬彬 <10...@users.noreply.github.com>
AuthorDate: Tue Oct 18 10:29:34 2022 +0800

    [flink ]module errorcode optimization and documentation (#3588)
    
    * [flink ]module errorcode optimization and documentation
---
 docs/errorcode/flink-errorcode.md                  |  63 ++++++
 .../flink/client/config/Environment.java           |  21 +-
 .../flink/client/config/entries/ConfigEntry.java   |   4 +-
 .../flink/client/context/ExecutionContext.java     |  12 +-
 .../deployment/ClusterDescriptorAdapter.java       |  17 +-
 .../flink/client/result/BatchResult.java           |   8 +-
 .../flink/client/result/ChangelogResult.java       |   8 +-
 .../flink/client/result/ResultUtil.java            |  15 +-
 .../client/sql/operation/AbstractJobOperation.java |   4 +-
 .../client/sql/operation/OperationFactoryImpl.java |   7 +-
 .../sql/operation/impl/CreateViewOperation.java    |   7 +-
 .../sql/operation/impl/DescribeTableOperation.java |   4 +-
 .../sql/operation/impl/DropViewOperation.java      |   4 +-
 .../sql/operation/impl/ExplainOperation.java       |   6 +-
 .../client/sql/operation/impl/InsertOperation.java |   4 +-
 .../client/sql/operation/impl/SelectOperation.java |  11 +-
 .../sql/operation/impl/UseCatalogOperation.java    |   4 +-
 .../sql/operation/impl/UseDatabaseOperation.java   |   4 +-
 .../client/sql/parser/SqlCommandParserImpl.java    |  10 +-
 .../flink/errorcode/FlinkErrorCodeSummary.java     | 251 +++++++++++++++++++++
 .../flink/exception/ExecutorInitException.java     |   8 +-
 .../flink/exception/FlinkInitFailedException.java  |   8 +-
 .../flink/exception/JobExecutionException.java     |   8 +-
 .../flink/exception/SqlExecutionException.java     |   8 +-
 .../flink/exception/SqlParseException.java         |   8 +-
 .../flink/executor/FlinkCodeOnceExecutor.scala     |   7 +-
 .../flink/executor/FlinkExecutor.scala             |   3 +-
 .../flink/executor/FlinkOnceExecutor.scala         |   9 +-
 .../executor/FlinkSQLComputationExecutor.scala     |   7 +-
 .../flink/factory/FlinkEngineConnFactory.scala     |  14 +-
 .../flink/operator/TriggerSavepointOperator.scala  |   3 +-
 .../engineconnplugin/flink/ql/GrammarFactory.scala |   3 +-
 .../flink/setting/HudiSettings.scala               |   5 +-
 .../engineconnplugin/flink/util/ClassUtil.scala    |   6 +-
 34 files changed, 456 insertions(+), 105 deletions(-)

diff --git a/docs/errorcode/flink-errorcode.md b/docs/errorcode/flink-errorcode.md
new file mode 100644
index 000000000..c9b2d6eb2
--- /dev/null
+++ b/docs/errorcode/flink-errorcode.md
@@ -0,0 +1,63 @@
+## flink  errorcode
+
+| module name(模块名) | error code(错误码)  | describe(描述) |enumeration name(枚举)| Exception Class(类名)|
+| -------- | -------- | ----- |-----|-----|
+|flink |16020|Cannot register module   because a module with this name is already registered.(无法注册模块,因为已注册具有此名称的模块.)|CANNOT_MODULE_ALREADY|FlinkErrorCodeSummary|
+|flink |16020|Cannot create catalog   because a catalog with this name is already registered.(无法创建目录,因为同名的目录已注册.)|CANNOT_CATALOG_ALREADY|FlinkErrorCodeSummary|
+|flink |16020|Cannot create table because a table with this name is already registered,table name(无法创建表,因为同名的表已注册,表名为):|CANNOT_TABLE_ALREADY|FlinkErrorCodeSummary|
+|flink |16020|Cannot create function because a function with this name is already registered,function name(无法创建函数,因为具有此名称的函数已注册,函数名称):|CANNOT_FUNCTION_ALREADY|FlinkErrorCodeSummary|
+|flink |16020|The sql code is empty.(sql代码为空.)|SQL_CODE_EMPTY|FlinkErrorCodeSummary|
+|flink |16020|Not support runType (不支持 runType)|NOT_SUPPORT_RUNTYPE|FlinkErrorCodeSummary|
+|flink |16020|Planner must be one of these:(规划师必须是以下之一:)|PLANNER_MUST_THESE|FlinkErrorCodeSummary|
+|flink |16020|Execution must be one of these:(执行类型必须是以下之一:)|EXECUTION_MUST_THESE|FlinkErrorCodeSummary|
+|flink |16020|Not supported YarnDeploymentTarget(不支持 YarnDeploymentTarget)|NOT_SUPPORTED_YARNTARGET|FlinkErrorCodeSummary|
+|flink |16020|Unknown checkpoint mode(未知检查点模式)|UNKNOWN_CHECKPOINT_MODE|FlinkErrorCodeSummary|
+|flink |16020|hudi jars is not exists.(hudi jars 不存在.)|HUDIJARS_NOT_EXISTS|FlinkErrorCodeSummary|
+|flink |16020|Path is not exist or is not a directory,The path is(路径不存在或不是目录,路径为):|PATH_NOT_EXIST|FlinkErrorCodeSummary|
+|flink |16020|Could not parse environment file. Cause(无法解析环境文件.原因):|BOT_PARSE_ENVIRONMENT|FlinkErrorCodeSummary|
+|flink |16020|Invalid configuration entry.(配置条目无效.)|CONFIGURATION_ENTRY_INVALID|FlinkErrorCodeSummary|
+|flink |16021|Only RESET ALL is supported now(现在只支持 RESET ALL)|ONLY_RESET_ALL|FlinkErrorCodeSummary|
+|flink |16021|Unsupported command call(不支持的命令调用) |SUPPORTED_COMMAND_CALL|FlinkErrorCodeSummary|
+|flink |16021|Only single statement is supported now(现在只支持单个语句)|ONLY_SINGLE_STATEMENT|FlinkErrorCodeSummary|
+|flink |16021|Unknown statement(未知声明):|UNKNOWN_STATEMENT|FlinkErrorCodeSummary|
+|flink |16021|Failed to parse statement.(解析语句失败.)|FAILED_PARSE_STATEMENT|FlinkErrorCodeSummary|
+|flink |16021| Failed to parse drop view statement.(无法解析 drop view 语句.)|FAILED_DROP_STATEMENT|FlinkErrorCodeSummary|
+|flink |16022| Unsupported execution type for sources.(不支持的源执行类型.)|SUPPORTED_SOURCES|FlinkErrorCodeSummary|
+|flink |16022|Unsupported execution type for sinks.(接收器不支持的执行类型.)|SUPPORTED_SINKS|FlinkErrorCodeSummary|
+|flink |16022|Unsupported function type(不支持的函数类型):|SUPPORTED_FUNCTION_TYPE|FlinkErrorCodeSummary|
+|flink |16022|Not support to transform this resultSet to JobId.(不支持将此 resultSet 转换为 JobId.)|NOT_SUPPORT_TRANSFORM|FlinkErrorCodeSummary|
+|flink |16022|has already been defined in the current session.(已在当前会话中定义.)|ALREADY_CURRENT_SESSION|FlinkErrorCodeSummary|
+|flink |16022|does not exist in the current session.(当前会话中不存在.)|NOT_EXIST_SESSION|FlinkErrorCodeSummary|
+|flink |16022|The job for this query has been canceled.(此查询的作业已取消.)|QUERY_CANCELED|FlinkErrorCodeSummary|
+|flink |16022|No job is generated, please ask admin for help!(未生成作业,请向管理员寻求帮助!)|NOT_JOB_ASD_ADMIN|FlinkErrorCodeSummary|
+|flink |16022|Not support grammar(不支持语法)|NOT_SUPPORT_GRAMMAR|FlinkErrorCodeSummary|
+|flink |16022|Error while submitting job.(提交作业时出错.)|ERROR_SUBMITTING_JOB|FlinkErrorCodeSummary|
+|flink |16022|Result The retrieved gateway address is invalid, the address is(结果 检索到的网关地址无效,地址为):|NOT_SOCKET_RETRIEVAL|FlinkErrorCodeSummary|
+|flink |16022|Could not determine address of the gateway for result retrieval  by connecting to the job manager. Please specify the gateway address manually.(无法通过连接到作业管理器来确定用于检索结果的网关地址,请手动指定网关地址.)|NOT_DETERMINE_ADDRESS_JOB|FlinkErrorCodeSummary|
+|flink |16022| Could not determine address of the gateway for result retrieval,Please specify the gateway address manually.(无法确定检索结果的网关地址,请手动指定网关地址.)|NOT_DETERMINE_ADDRESS|FlinkErrorCodeSummary|
+|flink |16022|Invalid SQL statement.(无效的 SQL 语句.)|INVALID_SQL_STATEMENT|FlinkErrorCodeSummary|
+|flink |16022|No table with this name could be found.(找不到具有此名称的表.)|NO_TABLE_FOUND|FlinkErrorCodeSummary|
+|flink |16022|Invalid SQL query.(无效的 SQL 查询.)|INVALID_SQL_QUERY|FlinkErrorCodeSummary|
+|flink |16022|Failed to switch to catalog (无法切换到目录):|FAILED_SWITCH_DATABASE|FlinkErrorCodeSummary|
+|flink |16022|Failed to switch to database (无法切换到数据库):|FAILED_SWITCH_CATALOG|FlinkErrorCodeSummary|
+|flink |16023|No job has been submitted. This is a bug.(未提交任何作业,这是一个错误.)|NO_JOB_SUBMITTED|FlinkErrorCodeSummary|
+|flink |16023|not supported savepoint operator mode(不支持保存点操作员模式)|NOT_SAVEPOINT_MODE|FlinkErrorCodeSummary|
+|flink |16023|Cluster information don't exist.(集群信息不存在.)|CLUSTER_NOT_EXIST|FlinkErrorCodeSummary|
+|flink |16023|No applicationId is exists!(不存在 applicationId!)|APPLICATIONID_NOT_EXIST|FlinkErrorCodeSummary|
+|flink |16023|The accumulator could not retrieve the result.(累加器无法检索结果.)|NOT_RETRIEVE_RESULT|FlinkErrorCodeSummary|
+|flink |16023|Not support method for requestExpectedResource.(不支持 requestExpectedResource 的方法.)|NOT_SUPPORT_METHOD|FlinkErrorCodeSummary|
+|flink |16023|Not support to do savepoint for  (不支持为保存点):|NOT_SUPPORT_SAVEPOTION|FlinkErrorCodeSummary|
+|flink |16023|Create a new instance of failure, the instance is(新建失败实例,实例为):|CREATE_INSTANCE_FAILURE|FlinkErrorCodeSummary|
+|flink |16023|Job:could not retrieve or create a cluster.(作业:无法检索或创建集群.)|NOT_CREATE_CLUSTER|FlinkErrorCodeSummary|
+|flink |16023|Job: operation failed(作业:操作失败)|OPERATION_FAILED|FlinkErrorCodeSummary|
+|flink |16023|Not support flink version, StreamExecutionEnvironment.class is not exists getConfiguration method!(不支持flink版本,StreamExecutionEnvironment.class不存在getConfiguration方法!)|NOT_FLINK_VERSION|FlinkErrorCodeSummary|
+|flink |16023|StreamExecutionEnvironment.getConfiguration() execute failed!(StreamExecutionEnvironment.getConfiguration() 执行失败!)|EXECUTE_FAILED|FlinkErrorCodeSummary|
+|flink |20001|Not support ClusterDescriptorAdapter for flink application.(不支持 flink 应用的 ClusterDescriptorAdapter.)|NOT_SUPPORT_FLINK|FlinkErrorCodeSummary|
+|flink |20001|The application start failed, since yarn applicationId is null.(应用程序启动失败,因为 yarn applicationId 为 null.)|YARN_IS_NULL|FlinkErrorCodeSummary|
+|flink |20001|Not support {} for FlinkSQLComputationExecutor.(不支持 FlinkSQLComputationExecutor 的 {}.)|NOT_SUPPORT_SIMPLENAME|FlinkErrorCodeSummary|
+|flink |20001|Fatal error, ClusterDescriptorAdapter is null, please ask admin for help.(致命错误,ClusterDescriptorAdapter 为空,请向管理员寻求帮助.)|ADAPTER_IS_NULL|FlinkErrorCodeSummary|
+
+ 
+ 
+ 
+ 
diff --git a/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/config/Environment.java b/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/config/Environment.java
index 58f26d0ad..8da772042 100644
--- a/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/config/Environment.java
+++ b/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/config/Environment.java
@@ -38,6 +38,8 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.linkis.engineconnplugin.flink.errorcode.FlinkErrorCodeSummary.*;
+
 public class Environment {
 
   public static final String SESSION_ENTRY = "session";
@@ -83,9 +85,7 @@ public class Environment {
       final ModuleEntry entry = ModuleEntry.create(config);
       if (this.modules.containsKey(entry.getName())) {
         throw new FlinkInitFailedException(
-            String.format(
-                "Cannot register module '%s' because a module with this name is already registered.",
-                entry.getName()));
+            String.format(CANNOT_MODULE_ALREADY.getErrorDesc(), entry.getName()));
       }
       this.modules.put(entry.getName(), entry);
     }
@@ -120,10 +120,7 @@ public class Environment {
     for (Map<String, Object> config : tables) {
       final TableEntry table = TableEntry.create(config);
       if (this.tables.containsKey(table.getName())) {
-        throw new FlinkInitFailedException(
-            "Cannot create table '"
-                + table.getName()
-                + "' because a table with this name is already registered.");
+        throw new FlinkInitFailedException(CANNOT_TABLE_ALREADY.getErrorDesc() + table.getName());
       }
       this.tables.put(table.getName(), table);
     }
@@ -140,9 +137,7 @@ public class Environment {
       final FunctionEntry function = FunctionEntry.create(config);
       if (this.functions.containsKey(function.getName())) {
         throw new FlinkInitFailedException(
-            "Cannot create function '"
-                + function.getName()
-                + "' because a function with this name is already registered.");
+            CANNOT_FUNCTION_ALREADY.getErrorDesc() + function.getName());
       }
       this.functions.put(function.getName(), function);
     }
@@ -228,8 +223,7 @@ public class Environment {
     try {
       return new ConfigUtil.LowerCaseYamlMapper().readValue(url, Environment.class);
     } catch (JsonMappingException e) {
-      throw new FlinkInitFailedException(
-          "Could not parse environment file. Cause: " + e.getMessage(), e);
+      throw new FlinkInitFailedException(BOT_PARSE_ENVIRONMENT.getErrorDesc() + e.getMessage(), e);
     }
   }
 
@@ -238,8 +232,7 @@ public class Environment {
     try {
       return new ConfigUtil.LowerCaseYamlMapper().readValue(content, Environment.class);
     } catch (JsonMappingException e) {
-      throw new FlinkInitFailedException(
-          "Could not parse environment file. Cause: " + e.getMessage(), e);
+      throw new FlinkInitFailedException(BOT_PARSE_ENVIRONMENT.getErrorDesc() + e.getMessage(), e);
     }
   }
 
diff --git a/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/config/entries/ConfigEntry.java b/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/config/entries/ConfigEntry.java
index 0246324f8..7fc4703f5 100644
--- a/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/config/entries/ConfigEntry.java
+++ b/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/config/entries/ConfigEntry.java
@@ -25,6 +25,8 @@ import org.apache.flink.table.descriptors.DescriptorProperties;
 import java.util.Map;
 import java.util.Objects;
 
+import static org.apache.linkis.engineconnplugin.flink.errorcode.FlinkErrorCodeSummary.CONFIGURATION_ENTRY_INVALID;
+
 public abstract class ConfigEntry {
 
   protected final DescriptorProperties properties;
@@ -33,7 +35,7 @@ public abstract class ConfigEntry {
     try {
       validate(properties);
     } catch (ValidationException e) {
-      throw new FlinkInitFailedException("Invalid configuration entry.", e);
+      throw new FlinkInitFailedException(CONFIGURATION_ENTRY_INVALID.getErrorDesc(), e);
     }
 
     this.properties = properties;
diff --git a/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/context/ExecutionContext.java b/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/context/ExecutionContext.java
index d87708dd9..1ed537ec1 100644
--- a/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/context/ExecutionContext.java
+++ b/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/context/ExecutionContext.java
@@ -69,6 +69,8 @@ import java.util.function.Supplier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.linkis.engineconnplugin.flink.errorcode.FlinkErrorCodeSummary.*;
+
 public class ExecutionContext {
 
   private static final Logger LOG = LoggerFactory.getLogger(ExecutionContext.class);
@@ -246,7 +248,7 @@ public class ExecutionContext {
                   BatchTableSourceFactory.class, sourceProperties, classLoader);
       return factory.createBatchTableSource(sourceProperties);
     }
-    throw new SqlExecutionException("Unsupported execution type for sources.");
+    throw new SqlExecutionException(SUPPORTED_SOURCES.getErrorDesc());
   }
 
   private static TableSink<?> createTableSink(
@@ -263,7 +265,7 @@ public class ExecutionContext {
               TableFactoryService.find(BatchTableSinkFactory.class, sinkProperties, classLoader);
       return factory.createBatchTableSink(sinkProperties);
     }
-    throw new SqlExecutionException("Unsupported execution type for sinks.");
+    throw new SqlExecutionException(SUPPORTED_SINKS.getErrorDesc());
   }
 
   private TableEnvironmentInternal createStreamTableEnvironment(
@@ -534,7 +536,8 @@ public class ExecutionContext {
         } else if (v instanceof TableFunction) {
           streamTableEnvironment.registerFunction(k, (TableFunction<?>) v);
         } else {
-          throw new SqlExecutionException("Unsupported function type: " + v.getClass().getName());
+          throw new SqlExecutionException(
+              SUPPORTED_FUNCTION_TYPE.getErrorDesc() + v.getClass().getName());
         }
       }
     } else {
@@ -549,7 +552,8 @@ public class ExecutionContext {
         } else if (v instanceof TableFunction) {
           batchTableEnvironment.registerFunction(k, (TableFunction<?>) v);
         } else {
-          throw new SqlExecutionException("Unsupported function type: " + v.getClass().getName());
+          throw new SqlExecutionException(
+              SUPPORTED_FUNCTION_TYPE.getErrorDesc() + v.getClass().getName());
         }
       }
     }
diff --git a/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/deployment/ClusterDescriptorAdapter.java b/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/deployment/ClusterDescriptorAdapter.java
index 95803cd92..e2eb7e436 100644
--- a/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/deployment/ClusterDescriptorAdapter.java
+++ b/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/deployment/ClusterDescriptorAdapter.java
@@ -42,6 +42,8 @@ import java.util.function.Supplier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.linkis.engineconnplugin.flink.errorcode.FlinkErrorCodeSummary.*;
+
 /** Cluster Descriptor Adapter, adaptable with datastream/sql tasks(集群交互适配器,适合datastream、sql方式作业) */
 public abstract class ClusterDescriptorAdapter implements Closeable {
   private static final Logger LOG = LoggerFactory.getLogger(ClusterDescriptorAdapter.class);
@@ -81,7 +83,7 @@ public abstract class ClusterDescriptorAdapter implements Closeable {
   /** Returns the status of the flink job. */
   public JobStatus getJobStatus() throws JobExecutionException {
     if (jobId == null) {
-      throw new JobExecutionException("No job has been submitted. This is a bug.");
+      throw new JobExecutionException(NO_JOB_SUBMITTED.getErrorDesc());
     }
     return bridgeClientRequest(
         this.executionContext, jobId, () -> clusterClient.getJobStatus(jobId), false);
@@ -111,7 +113,7 @@ public abstract class ClusterDescriptorAdapter implements Closeable {
         function = () -> clusterClient.stopWithSavepoint(jobId, false, savepoint);
         break;
       default:
-        throw new JobExecutionException("not supported savepoint operator mode " + mode);
+        throw new JobExecutionException(NOT_SAVEPOINT_MODE.getErrorDesc() + mode);
     }
     return bridgeClientRequest(this.executionContext, jobId, function, false);
   }
@@ -134,7 +136,7 @@ public abstract class ClusterDescriptorAdapter implements Closeable {
     if (clusterClient == null) {
       if (this.clusterID == null) {
         LOG.error("Cluster information don't exist.");
-        throw new JobExecutionException("Cluster information don't exist.");
+        throw new JobExecutionException(CLUSTER_NOT_EXIST.getErrorDesc());
       }
       clusterDescriptor = executionContext.createClusterDescriptor();
       try {
@@ -162,9 +164,7 @@ public abstract class ClusterDescriptorAdapter implements Closeable {
     try {
       method = StreamExecutionEnvironment.class.getDeclaredMethod("getConfiguration");
     } catch (NoSuchMethodException e) {
-      throw new JobExecutionException(
-          "Not support flink version, StreamExecutionEnvironment.class is not exists getConfiguration method!",
-          e);
+      throw new JobExecutionException(NOT_FLINK_VERSION.getErrorDesc(), e);
     }
     method.setAccessible(true);
     Configuration configuration;
@@ -172,8 +172,7 @@ public abstract class ClusterDescriptorAdapter implements Closeable {
       configuration =
           (Configuration) method.invoke(executionContext.getStreamExecutionEnvironment());
     } catch (Exception e) {
-      throw new JobExecutionException(
-          "StreamExecutionEnvironment.getConfiguration() execute failed!", e);
+      throw new JobExecutionException(EXECUTE_FAILED.getErrorDesc(), e);
     }
     String applicationId = configuration.getString(YarnConfigOptions.APPLICATION_ID);
     if (StringUtils.isNotBlank(applicationId)) {
@@ -184,7 +183,7 @@ public abstract class ClusterDescriptorAdapter implements Closeable {
     }
     applicationId = executionContext.getFlinkConfig().getString(YarnConfigOptions.APPLICATION_ID);
     if (StringUtils.isBlank(applicationId) && this.clusterID == null) {
-      throw new JobExecutionException("No applicationId is exists!");
+      throw new JobExecutionException(APPLICATIONID_NOT_EXIST.getErrorDesc());
     } else if (StringUtils.isNotBlank(applicationId)) {
       configuration.setString(YarnConfigOptions.APPLICATION_ID, applicationId);
       LOG.info("Bind applicationId {} to StreamExecutionEnvironment.", applicationId);
diff --git a/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/result/BatchResult.java b/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/result/BatchResult.java
index 959e94bc3..aca966a6f 100644
--- a/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/result/BatchResult.java
+++ b/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/result/BatchResult.java
@@ -42,6 +42,9 @@ import java.util.function.Consumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.linkis.engineconnplugin.flink.errorcode.FlinkErrorCodeSummary.ERROR_SUBMITTING_JOB;
+import static org.apache.linkis.engineconnplugin.flink.errorcode.FlinkErrorCodeSummary.NOT_RETRIEVE_RESULT;
+
 public class BatchResult<C> extends AbstractResult<C, Row> {
 
   private Logger LOG = LoggerFactory.getLogger(getClass());
@@ -73,7 +76,8 @@ public class BatchResult<C> extends AbstractResult<C, Row> {
             (unused, throwable) -> {
               if (throwable != null) {
                 executionException.compareAndSet(
-                    null, new SqlExecutionException("Error while submitting job.", throwable));
+                    null,
+                    new SqlExecutionException(ERROR_SUBMITTING_JOB.getErrorDesc(), throwable));
               }
             });
   }
@@ -117,7 +121,7 @@ public class BatchResult<C> extends AbstractResult<C, Row> {
         final ArrayList<byte[]> accResult =
             jobExecutionResult.getAccumulatorResult(accumulatorName);
         if (accResult == null) {
-          throw new JobExecutionException("The accumulator could not retrieve the result.");
+          throw new JobExecutionException(NOT_RETRIEVE_RESULT.getErrorDesc());
         }
         final List<Row> resultTable =
             SerializedListAccumulator.deserializeList(accResult, tableSink.getSerializer());
diff --git a/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/result/ChangelogResult.java b/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/result/ChangelogResult.java
index 6a3913763..0a2b68393 100644
--- a/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/result/ChangelogResult.java
+++ b/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/result/ChangelogResult.java
@@ -47,6 +47,9 @@ import java.util.function.Supplier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.linkis.engineconnplugin.flink.errorcode.FlinkErrorCodeSummary.ERROR_SUBMITTING_JOB;
+import static org.apache.linkis.engineconnplugin.flink.errorcode.FlinkErrorCodeSummary.NOT_SOCKET_RETRIEVAL;
+
 public class ChangelogResult extends AbstractResult<ApplicationId, Tuple2<Boolean, Row>> {
 
   private static final Logger LOG = LoggerFactory.getLogger(ChangelogResult.class);
@@ -78,7 +81,7 @@ public class ChangelogResult extends AbstractResult<ApplicationId, Tuple2<Boolea
       // pass gateway port and address such that iterator knows where to bind to
       iterator = new SocketStreamIterator<>(gatewayPort, gatewayAddress, serializer);
     } catch (IOException e) {
-      throw new SqlExecutionException("Could not start socket for result retrieval.", e);
+      throw new SqlExecutionException(NOT_SOCKET_RETRIEVAL.getErrorDesc(), e);
     }
 
     // create table sink
@@ -106,7 +109,8 @@ public class ChangelogResult extends AbstractResult<ApplicationId, Tuple2<Boolea
                   if (throwable != null) {
                     LOG.warn("throwable is not null", throwable);
                     executionException.compareAndSet(
-                        null, new SqlExecutionException("Error while submitting job.", throwable));
+                        null,
+                        new SqlExecutionException(ERROR_SUBMITTING_JOB.getErrorDesc(), throwable));
                   } else {
                     LOG.warn("throwable is null");
                   }
diff --git a/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/result/ResultUtil.java b/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/result/ResultUtil.java
index a94360c3d..07fb36a38 100644
--- a/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/result/ResultUtil.java
+++ b/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/result/ResultUtil.java
@@ -35,6 +35,8 @@ import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.util.stream.Stream;
 
+import static org.apache.linkis.engineconnplugin.flink.errorcode.FlinkErrorCodeSummary.*;
+
 public class ResultUtil {
 
   public static <C> BatchResult<C> createBatchResult(TableSchema schema, ExecutionConfig config) {
@@ -82,8 +84,7 @@ public class ResultUtil {
       try {
         return InetAddress.getByName(address);
       } catch (UnknownHostException e) {
-        throw new SqlExecutionException(
-            "Invalid gateway address '" + address + "' for result retrieval.", e);
+        throw new SqlExecutionException(NOT_SOCKET_RETRIEVAL.getErrorDesc() + address, e);
       }
     } else {
       // TODO cache this
@@ -97,19 +98,13 @@ public class ResultUtil {
               deploy.getResponseTimeout(),
               400);
         } catch (Exception e) {
-          throw new SqlExecutionException(
-              "Could not determine address of the gateway for result retrieval "
-                  + "by connecting to the job manager. Please specify the gateway address manually.",
-              e);
+          throw new SqlExecutionException(NOT_DETERMINE_ADDRESS_JOB.getErrorDesc(), e);
         }
       } else {
         try {
           return InetAddress.getLocalHost();
         } catch (UnknownHostException e) {
-          throw new SqlExecutionException(
-              "Could not determine address of the gateway for result retrieval. "
-                  + "Please specify the gateway address manually.",
-              e);
+          throw new SqlExecutionException(NOT_DETERMINE_ADDRESS.getErrorDesc(), e);
         }
       }
     }
diff --git a/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/AbstractJobOperation.java b/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/AbstractJobOperation.java
index b8fb67cad..318f65053 100644
--- a/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/AbstractJobOperation.java
+++ b/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/AbstractJobOperation.java
@@ -38,6 +38,8 @@ import java.util.Optional;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.linkis.engineconnplugin.flink.errorcode.FlinkErrorCodeSummary.NOT_SUPPORT_TRANSFORM;
+
 /** A default implementation of JobOperation. */
 public abstract class AbstractJobOperation extends FlinkListenerGroupImpl implements JobOperation {
 
@@ -78,7 +80,7 @@ public abstract class AbstractJobOperation extends FlinkListenerGroupImpl implem
 
   public JobID transformToJobInfo(ResultSet resultSet) throws SqlExecutionException {
     if (resultSet.getColumns().size() != 1) {
-      throw new SqlExecutionException("Not support to transform this resultSet to JobId.");
+      throw new SqlExecutionException(NOT_SUPPORT_TRANSFORM.getErrorDesc());
     }
     Row row = resultSet.getData().get(0);
     return JobID.fromHexString(row.getField(0).toString());
diff --git a/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/OperationFactoryImpl.java b/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/OperationFactoryImpl.java
index 4f216f1a0..662922cb5 100644
--- a/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/OperationFactoryImpl.java
+++ b/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/OperationFactoryImpl.java
@@ -23,6 +23,9 @@ import org.apache.linkis.engineconnplugin.flink.context.FlinkEngineConnContext;
 import org.apache.linkis.engineconnplugin.flink.exception.SqlParseException;
 import org.apache.linkis.engineconnplugin.flink.util.ClassUtil;
 
+import static org.apache.linkis.engineconnplugin.flink.errorcode.FlinkErrorCodeSummary.ONLY_RESET_ALL;
+import static org.apache.linkis.engineconnplugin.flink.errorcode.FlinkErrorCodeSummary.SUPPORTED_COMMAND_CALL;
+
 public class OperationFactoryImpl implements OperationFactory {
 
   private OperationFactoryImpl() {}
@@ -64,7 +67,7 @@ public class OperationFactoryImpl implements OperationFactory {
         break;
       case RESET:
         if (call.operands.length > 0) {
-          throw new SqlParseException("Only RESET ALL is supported now");
+          throw new SqlParseException(ONLY_RESET_ALL.getErrorDesc());
         }
         operation = new ResetOperation(context);
         break;
@@ -109,7 +112,7 @@ public class OperationFactoryImpl implements OperationFactory {
         operation = new ExplainOperation(context, call.operands[0]);
         break;
       default:
-        throw new SqlParseException("Unsupported command call " + call + ". This is a bug.");
+        throw new SqlParseException(SUPPORTED_COMMAND_CALL.getErrorDesc() + call);
     }
     return operation;
   }
diff --git a/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/impl/CreateViewOperation.java b/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/impl/CreateViewOperation.java
index 817914eeb..184fb94ba 100644
--- a/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/impl/CreateViewOperation.java
+++ b/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/impl/CreateViewOperation.java
@@ -29,6 +29,9 @@ import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.client.config.entries.TableEntry;
 import org.apache.flink.table.client.config.entries.ViewEntry;
 
+import static org.apache.linkis.engineconnplugin.flink.errorcode.FlinkErrorCodeSummary.ALREADY_CURRENT_SESSION;
+import static org.apache.linkis.engineconnplugin.flink.errorcode.FlinkErrorCodeSummary.INVALID_SQL_STATEMENT;
+
 /** Operation for CREATE VIEW command. */
 public class CreateViewOperation implements NonJobOperation {
   private final ExecutionContext context;
@@ -47,7 +50,7 @@ public class CreateViewOperation implements NonJobOperation {
     TableEntry tableEntry = env.getTables().get(viewName);
     if (tableEntry instanceof ViewEntry) {
       throw new SqlExecutionException(
-          "'" + viewName + "' has already been defined in the current session.");
+          "'" + viewName + "' " + ALREADY_CURRENT_SESSION.getErrorDesc());
     }
 
     // TODO check the logic
@@ -60,7 +63,7 @@ public class CreateViewOperation implements NonJobOperation {
           });
     } catch (Throwable t) {
       // catch everything such that the query does not crash the executor
-      throw new SqlExecutionException("Invalid SQL statement.", t);
+      throw new SqlExecutionException(INVALID_SQL_STATEMENT.getErrorDesc(), t);
     }
     // Also attach the view to ExecutionContext#environment.
     env.getTables().put(viewName, ViewEntry.create(viewName, query));
diff --git a/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/impl/DescribeTableOperation.java b/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/impl/DescribeTableOperation.java
index 2e2a5c423..8f1bef397 100644
--- a/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/impl/DescribeTableOperation.java
+++ b/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/impl/DescribeTableOperation.java
@@ -41,6 +41,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.linkis.engineconnplugin.flink.errorcode.FlinkErrorCodeSummary.NO_TABLE_FOUND;
+
 /** Operation for DESCRIBE TABLE command. */
 public class DescribeTableOperation implements NonJobOperation {
   private final ExecutionContext context;
@@ -61,7 +63,7 @@ public class DescribeTableOperation implements NonJobOperation {
       schema = context.wrapClassLoader(() -> tableEnv.from(tableName).getSchema());
     } catch (Throwable t) {
       // catch everything such that the query does not crash the executor
-      throw new SqlExecutionException("No table with this name could be found.", t);
+      throw new SqlExecutionException(NO_TABLE_FOUND.getErrorDesc(), t);
     }
 
     Map<String, String> fieldToWatermark = new HashMap<>();
diff --git a/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/impl/DropViewOperation.java b/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/impl/DropViewOperation.java
index 7c5472014..8cbc4a71a 100644
--- a/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/impl/DropViewOperation.java
+++ b/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/impl/DropViewOperation.java
@@ -28,6 +28,8 @@ import org.apache.linkis.engineconnplugin.flink.exception.SqlExecutionException;
 import org.apache.flink.table.client.config.entries.TableEntry;
 import org.apache.flink.table.client.config.entries.ViewEntry;
 
+import static org.apache.linkis.engineconnplugin.flink.errorcode.FlinkErrorCodeSummary.NOT_EXIST_SESSION;
+
 /** Operation for DROP VIEW command. */
 public class DropViewOperation implements NonJobOperation {
   private final FlinkEngineConnContext context;
@@ -45,7 +47,7 @@ public class DropViewOperation implements NonJobOperation {
     Environment env = context.getExecutionContext().getEnvironment();
     TableEntry tableEntry = env.getTables().get(viewName);
     if (!(tableEntry instanceof ViewEntry) && !ifExists) {
-      throw new SqlExecutionException("'" + viewName + "' does not exist in the current session.");
+      throw new SqlExecutionException("'" + viewName + "' " + NOT_EXIST_SESSION.getErrorDesc());
     }
 
     // Here we rebuild the ExecutionContext because we want to ensure that all the remaining
diff --git a/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/impl/ExplainOperation.java b/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/impl/ExplainOperation.java
index 7bae5c670..df343195d 100644
--- a/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/impl/ExplainOperation.java
+++ b/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/impl/ExplainOperation.java
@@ -31,6 +31,8 @@ import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.types.logical.VarCharType;
 import org.apache.flink.types.Row;
 
+import static org.apache.linkis.engineconnplugin.flink.errorcode.FlinkErrorCodeSummary.INVALID_SQL_STATEMENT;
+
 /** Operation for EXPLAIN command. */
 public class ExplainOperation implements NonJobOperation {
   private final ExecutionContext context;
@@ -59,7 +61,7 @@ public class ExplainOperation implements NonJobOperation {
       throw t;
     } catch (Exception t) {
       // catch everything such that the query does not crash the executor
-      throw new SqlExecutionException("Invalid SQL statement.", t);
+      throw new SqlExecutionException(INVALID_SQL_STATEMENT.getErrorDesc(), t);
     }
   }
 
@@ -71,7 +73,7 @@ public class ExplainOperation implements NonJobOperation {
       return context.wrapClassLoader(() -> tableEnv.sqlQuery(selectQuery));
     } catch (Exception t) {
       // catch everything such that the query does not crash the executor
-      throw new SqlExecutionException("Invalid SQL statement.", t);
+      throw new SqlExecutionException(INVALID_SQL_STATEMENT.getErrorDesc(), t);
     }
   }
 }
diff --git a/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/impl/InsertOperation.java b/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/impl/InsertOperation.java
index 7a866372d..3e77221bd 100644
--- a/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/impl/InsertOperation.java
+++ b/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/impl/InsertOperation.java
@@ -41,6 +41,8 @@ import java.util.concurrent.CompletableFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.linkis.engineconnplugin.flink.errorcode.FlinkErrorCodeSummary.INVALID_SQL_STATEMENT;
+
 /** Operation for INSERT command. */
 public class InsertOperation extends AbstractJobOperation {
   private static final Logger LOG = LoggerFactory.getLogger(InsertOperation.class);
@@ -88,7 +90,7 @@ public class InsertOperation extends AbstractJobOperation {
     } catch (Exception t) {
       LOG.error(String.format("Invalid SQL query, sql is: %s.", statement), t);
       // catch everything such that the statement does not crash the executor
-      throw new SqlExecutionException("Invalid SQL statement.", t);
+      throw new SqlExecutionException(INVALID_SQL_STATEMENT.getErrorDesc(), t);
     }
     asyncNotify(tableResult);
     return tableResult.getJobClient().get().getJobID();
diff --git a/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/impl/SelectOperation.java b/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/impl/SelectOperation.java
index 9881f7f92..7d2bfe0bf 100644
--- a/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/impl/SelectOperation.java
+++ b/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/impl/SelectOperation.java
@@ -50,6 +50,8 @@ import java.util.UUID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.linkis.engineconnplugin.flink.errorcode.FlinkErrorCodeSummary.*;
+
 /** Operation for SELECT command. */
 public class SelectOperation extends AbstractJobOperation {
 
@@ -94,7 +96,7 @@ public class SelectOperation extends AbstractJobOperation {
     synchronized (lock) {
       if (result == null) {
         LOG.error("The job for this query has been canceled.");
-        throw new SqlExecutionException("The job for this query has been canceled.");
+        throw new SqlExecutionException(QUERY_CANCELED.getErrorDesc());
       }
 
       if (this.result instanceof ChangelogResult) {
@@ -180,7 +182,7 @@ public class SelectOperation extends AbstractJobOperation {
       result.close();
       LOG.error(String.format("Invalid SQL query, sql is %s.", query), t);
       // catch everything such that the query does not crash the executor
-      throw new SqlExecutionException("Invalid SQL query.", t);
+      throw new SqlExecutionException(INVALID_SQL_QUERY.getErrorDesc(), t);
     } finally {
       // Remove the temporal table object.
       executionContext.wrapClassLoader(tableEnv -> tableEnv.dropTemporaryTable(tableName));
@@ -196,8 +198,7 @@ public class SelectOperation extends AbstractJobOperation {
               result.startRetrieval(jobClient);
               return jobId;
             })
-        .orElseThrow(
-            () -> new SqlExecutionException("No job is generated, please ask admin for help!"));
+        .orElseThrow(() -> new SqlExecutionException(NOT_JOB_ASD_ADMIN.getErrorDesc()));
   }
 
   /** Creates a table using the given query in the given table environment. */
@@ -208,7 +209,7 @@ public class SelectOperation extends AbstractJobOperation {
       return context.wrapClassLoader(() -> tableEnv.sqlQuery(selectQuery));
     } catch (Exception t) {
       // catch everything such that the query does not crash the executor
-      throw new SqlExecutionException("Invalid SQL statement.", t);
+      throw new SqlExecutionException(INVALID_SQL_STATEMENT.getErrorDesc(), t);
     }
   }
 
diff --git a/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/impl/UseCatalogOperation.java b/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/impl/UseCatalogOperation.java
index 09808bcde..244d31ac6 100644
--- a/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/impl/UseCatalogOperation.java
+++ b/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/impl/UseCatalogOperation.java
@@ -27,6 +27,8 @@ import org.apache.linkis.engineconnplugin.flink.exception.SqlExecutionException;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 
+import static org.apache.linkis.engineconnplugin.flink.errorcode.FlinkErrorCodeSummary.FAILED_SWITCH_CATALOG;
+
 /** Operation for USE CATALOG command. */
 public class UseCatalogOperation implements NonJobOperation {
   private final ExecutionContext context;
@@ -49,7 +51,7 @@ public class UseCatalogOperation implements NonJobOperation {
             return null;
           });
     } catch (CatalogException e) {
-      throw new SqlExecutionException("Failed to switch to catalog " + catalogName, e);
+      throw new SqlExecutionException(FAILED_SWITCH_CATALOG.getErrorDesc() + catalogName, e);
     }
 
     return OperationUtil.OK;
diff --git a/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/impl/UseDatabaseOperation.java b/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/impl/UseDatabaseOperation.java
index 14513ea91..7f175dbd4 100644
--- a/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/impl/UseDatabaseOperation.java
+++ b/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/impl/UseDatabaseOperation.java
@@ -27,6 +27,8 @@ import org.apache.linkis.engineconnplugin.flink.exception.SqlExecutionException;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 
+import static org.apache.linkis.engineconnplugin.flink.errorcode.FlinkErrorCodeSummary.FAILED_SWITCH_DATABASE;
+
 /** Operation for USE DATABASE command. */
 public class UseDatabaseOperation implements NonJobOperation {
   private final ExecutionContext context;
@@ -48,7 +50,7 @@ public class UseDatabaseOperation implements NonJobOperation {
             return null;
           });
     } catch (CatalogException e) {
-      throw new SqlExecutionException("Failed to switch to database " + databaseName, e);
+      throw new SqlExecutionException(FAILED_SWITCH_DATABASE.getErrorDesc() + databaseName, e);
     }
     return OperationUtil.OK;
   }
diff --git a/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/parser/SqlCommandParserImpl.java b/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/parser/SqlCommandParserImpl.java
index 27ef58fa6..8fdfc2a35 100644
--- a/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/parser/SqlCommandParserImpl.java
+++ b/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/parser/SqlCommandParserImpl.java
@@ -36,6 +36,8 @@ import java.util.regex.Matcher;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.apache.linkis.engineconnplugin.flink.errorcode.FlinkErrorCodeSummary.*;
+
 public class SqlCommandParserImpl implements SqlCommandParser {
 
   @Override
@@ -80,10 +82,10 @@ public class SqlCommandParserImpl implements SqlCommandParser {
       sqlNodes = sqlParser.parseStmtList();
       // no need check the statement is valid here
     } catch (org.apache.calcite.sql.parser.SqlParseException e) {
-      throw new SqlParseException("Failed to parse statement.", e);
+      throw new SqlParseException(FAILED_PARSE_STATEMENT.getErrorDesc(), e);
     }
     if (sqlNodes.size() != 1) {
-      throw new SqlParseException("Only single statement is supported now");
+      throw new SqlParseException(ONLY_SINGLE_STATEMENT.getErrorDesc());
     }
 
     final String[] operands;
@@ -132,14 +134,14 @@ public class SqlCommandParserImpl implements SqlCommandParser {
       try {
         ifExistsField = SqlDrop.class.getDeclaredField("ifExists");
       } catch (NoSuchFieldException e) {
-        throw new SqlParseException("Failed to parse drop view statement.", e);
+        throw new SqlParseException(FAILED_DROP_STATEMENT.getErrorDesc(), e);
       }
       ifExistsField.setAccessible(true);
       boolean ifExists;
       try {
         ifExists = ifExistsField.getBoolean(dropViewNode);
       } catch (IllegalAccessException e) {
-        throw new SqlParseException("Failed to parse drop view statement.", e);
+        throw new SqlParseException(FAILED_DROP_STATEMENT.getErrorDesc(), e);
       }
 
       cmd = SqlCommand.DROP_VIEW;
diff --git a/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/errorcode/FlinkErrorCodeSummary.java b/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/errorcode/FlinkErrorCodeSummary.java
new file mode 100644
index 000000000..01b47de5d
--- /dev/null
+++ b/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/errorcode/FlinkErrorCodeSummary.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineconnplugin.flink.errorcode;
+
+public enum FlinkErrorCodeSummary {
+  CANNOT_MODULE_ALREADY(
+      16020,
+      "Cannot register module   because a module with this name is already registered.(无法注册模块,因为已注册具有此名称的模块.)",
+      "Cannot register module   because a module with this name is already registered.(无法注册模块,因为已注册具有此名称的模块.)"),
+  CANNOT_CATALOG_ALREADY(
+      16020,
+      "Cannot create catalog   because a catalog with this name is already registered.(无法创建目录,因为同名的目录已注册.)",
+      "Cannot create catalog   because a catalog with this name is already registered.(无法创建目录,因为同名的目录已注册.)"),
+  CANNOT_TABLE_ALREADY(
+      16020,
+      "Cannot create table because a table with this name is already registered,table name(无法创建表,因为同名的表已注册,表名为):",
+      "Cannot create table because a table with this name is already registered,table name(无法创建表,因为同名的表已注册,表名为):"),
+  CANNOT_FUNCTION_ALREADY(
+      16020,
+      "Cannot create function because a function with this name is already registered,function name(无法创建函数,因为具有此名称的函数已注册,函数名称):",
+      "Cannot create function because a function with this name is already registered,function name(无法创建函数,因为具有此名称的函数已注册,函数名称):"),
+  SQL_CODE_EMPTY(16020, "The sql code is empty.(sql代码为空.)", "The sql code is empty.(sql代码为空.)"),
+  NOT_SUPPORT_RUNTYPE(
+      16020, "Not support runType (不支持 runType)", "Not support runType (不支持 runType)"),
+  PLANNER_MUST_THESE(
+      16020,
+      "Planner must be one of these:(规划师必须是以下之一:)",
+      "Planner must be one of these:(规划师必须是以下之一:)"),
+  EXECUTION_MUST_THESE(
+      16020,
+      "Execution must be one of these:(执行类型必须是以下之一:)",
+      "Execution must be one of these:(执行类型必须是以下之一:)"),
+  NOT_SUPPORTED_YARNTARGET(
+      16020,
+      "Not supported YarnDeploymentTarget(不支持 YarnDeploymentTarget)",
+      "Not supported YarnDeploymentTarget(不支持 YarnDeploymentTarget)"),
+  UNKNOWN_CHECKPOINT_MODE(
+      16020, "Unknown checkpoint mode(未知检查点模式)", "Unknown checkpoint mode(未知检查点模式)"),
+  HUDIJARS_NOT_EXISTS(
+      16020,
+      "hudi jars is not exists.(hudi jars 不存在.)",
+      "hudi jars is not exists.(hudi jars 不存在.)"),
+  PATH_NOT_EXIST(
+      16020,
+      "Path is not exist or is not a directory,The path is(路径不存在或不是目录,路径为):",
+      "Path is not exist or is not a directory,The path is(路径不存在或不是目录,路径为):"),
+  BOT_PARSE_ENVIRONMENT(
+      16020,
+      "Could not parse environment file. Cause(无法解析环境文件.原因):",
+      "Could not parse environment file. Cause(无法解析环境文件.原因):"),
+  CONFIGURATION_ENTRY_INVALID(
+      16020, "Invalid configuration entry.(配置条目无效.)", "Invalid configuration entry.(配置条目无效.)"),
+  FLINK_INIT_EXCEPTION_ID(16020, "", ""),
+  ONLY_RESET_ALL(
+      16021,
+      "Only RESET ALL is supported now(现在只支持 RESET ALL)",
+      "Only RESET ALL is supported now(现在只支持 RESET ALL)"),
+  SUPPORTED_COMMAND_CALL(
+      16021, "Unsupported command call(不支持的命令调用) ", "Unsupported command call(不支持的命令调用) "),
+  ONLY_SINGLE_STATEMENT(
+      16021,
+      "Only single statement is supported now(现在只支持单个语句) ",
+      "Only single statement is supported now(现在只支持单个语句) "),
+  UNKNOWN_STATEMENT(16021, "Unknown statement(未知声明):", "Unknown statement(未知声明):"),
+  FAILED_PARSE_STATEMENT(
+      16021, "Failed to parse statement.(解析语句失败.)", "Failed to parse statement.(解析语句失败.)"),
+  FAILED_DROP_STATEMENT(
+      16021,
+      "Failed to parse drop view statement.(无法解析 drop view 语句.)",
+      "Failed to parse drop view statement.(无法解析 drop view 语句.)"),
+  SQL_PARSE_ID(16021, "", ""),
+  SUPPORTED_SOURCES(
+      16022,
+      "Unsupported execution type for sources.(不支持的源执行类型.)",
+      "Unsupported execution type for sources.(不支持的源执行类型.)"),
+  SUPPORTED_SINKS(
+      16022,
+      "Unsupported execution type for sinks.(接收器不支持的执行类型.)",
+      "Unsupported execution type for sinks.(接收器不支持的执行类型.)"),
+  SUPPORTED_FUNCTION_TYPE(
+      16022, "Unsupported function type(不支持的函数类型):", "Unsupported function type(不支持的函数类型):"),
+  NOT_SUPPORT_TRANSFORM(
+      16022,
+      "Not support to transform this resultSet to JobId.(不支持将此 resultSet 转换为 JobId.)",
+      "Not support to transform this resultSet to JobId.(不支持将此 resultSet 转换为 JobId.)"),
+  ALREADY_CURRENT_SESSION(
+      16022,
+      "has already been defined in the current session.(已在当前会话中定义.)",
+      "has already been defined in the current session.(已在当前会话中定义.)"),
+  NOT_EXIST_SESSION(
+      16022,
+      "does not exist in the current session.(当前会话中不存在.)",
+      "does not exist in the current session.(当前会话中不存在.)"),
+  QUERY_CANCELED(
+      16022,
+      "The job for this query has been canceled.(此查询的作业已取消.)",
+      "The job for this query has been canceled.(此查询的作业已取消.)"),
+  NOT_JOB_ASD_ADMIN(
+      16022,
+      "No job is generated, please ask admin for help!(未生成作业,请向管理员寻求帮助!)",
+      "No job is generated, please ask admin for help!(未生成作业,请向管理员寻求帮助!)"),
+  NOT_SUPPORT_GRAMMAR(16022, "Not support grammar(不支持语法)", "Not support grammar(不支持语法)"),
+  ERROR_SUBMITTING_JOB(
+      16022, "Error while submitting job.(提交作业时出错.)", "Error while submitting job.(提交作业时出错.)"),
+  NOT_SOCKET_RETRIEVAL(
+      16022,
+      "Result The retrieved gateway address is invalid, the address is(结果 检索到的网关地址无效,地址为):",
+      "Result The retrieved gateway address is invalid, the address is(结果 检索到的网关地址无效,地址为):"),
+  NOT_DETERMINE_ADDRESS_JOB(
+      16022,
+      "Could not determine address of the gateway for result retrieval  by connecting to the job manager. Please specify the gateway address manually.(无法通过连接到作业管理器来确定用于检索结果的网关地址,请手动指定网关地址.)",
+      "Could not determine address of the gateway for result retrieval  by connecting to the job manager. Please specify the gateway address manually.(无法通过连接到作业管理器来确定用于检索结果的网关地址,请手动指定网关地址.)"),
+  NOT_DETERMINE_ADDRESS(
+      16022,
+      "Could not determine address of the gateway for result retrieval,Please specify the gateway address manually.(无法确定检索结果的网关地址,请手动指定网关地址.)",
+      "Could not determine address of the gateway for result retrieval,Please specify the gateway address manually.(无法确定检索结果的网关地址,请手动指定网关地址.)"),
+  INVALID_SQL_STATEMENT(
+      16022, "Invalid SQL statement.(无效的 SQL 语句.)", "Invalid SQL statement.(无效的 SQL 语句.)"),
+  NO_TABLE_FOUND(
+      16022,
+      "No table with this name could be found.(找不到具有此名称的表.)",
+      "No table with this name could be found.(找不到具有此名称的表。.)"),
+  INVALID_SQL_QUERY(16022, "Invalid SQL query.(无效的 SQL 查询.)", "Invalid SQL query.(无效的 SQL 查询.)"),
+  FAILED_SWITCH_DATABASE(
+      16022, "Failed to switch to catalog (无法切换到目录):", "Failed to switch to catalog (无法切换到目录):"),
+  FAILED_SWITCH_CATALOG(
+      16022,
+      "Failed to switch to database (无法切换到数据库):",
+      "Failed to switch to database (无法切换到数据库):"),
+  SQL_EXECUTION_ID(16022, "", ""),
+  NO_JOB_SUBMITTED(
+      16023,
+      "No job has been submitted. This is a bug.(未提交任何作业,这是一个错误.)",
+      "No job has been submitted. This is a bug.(未提交任何作业,这是一个错误.)"),
+  NOT_SAVEPOINT_MODE(
+      16023,
+      "not supported savepoint operator mode(不支持保存点操作员模式)",
+      "not supported savepoint operator mode(不支持保存点操作员模式)"),
+  CLUSTER_NOT_EXIST(
+      16023,
+      "Cluster information don't exist.(集群信息不存在.)",
+      "Cluster information don't exist.(集群信息不存在.)"),
+  APPLICATIONID_NOT_EXIST(
+      16023,
+      "No applicationId is exists!(不存在 applicationId!)",
+      "No applicationId is exists!(不存在 applicationId!)"),
+  NOT_RETRIEVE_RESULT(
+      16023,
+      "The accumulator could not retrieve the result.(累加器无法检索结果.)",
+      "The accumulator could not retrieve the result.(累加器无法检索结果.)"),
+  NOT_SUPPORT_METHOD(
+      16023,
+      "Not support method for requestExpectedResource.(不支持 requestExpectedResource 的方法.)",
+      "Not support method for requestExpectedResource.(不支持 requestExpectedResource 的方法.)"),
+  NOT_SUPPORT_SAVEPOTION(
+      16023,
+      "Not support to do savepoint for  (不支持为保存点):",
+      "Not support to do savepoint for (不支持为保存点):"),
+  CREATE_INSTANCE_FAILURE(
+      16023,
+      "Create a new instance of failure, the instance is(新建失败实例,实例为):",
+      "Create a new instance of failure, the instance is(新建失败实例,实例为):"),
+  NOT_CREATE_CLUSTER(
+      16023,
+      "Job:could not retrieve or create a cluster.(作业:无法检索或创建集群.)",
+      "Job:could not retrieve or create a cluster.(作业:无法检索或创建集群.)"),
+  OPERATION_FAILED(16023, "Job: operation failed(作业:操作失败)", "Job: operation failed(作业:操作失败)"),
+  NOT_FLINK_VERSION(
+      16023,
+      "Not support flink version, StreamExecutionEnvironment.class is not exists getConfiguration method!(不支持flink版本,StreamExecutionEnvironment.class不存在getConfiguration方法!)",
+      "Not support flink version, StreamExecutionEnvironment.class is not exists getConfiguration method!(不支持flink版本,StreamExecutionEnvironment.class不存在getConfiguration方法!)"),
+  EXECUTE_FAILED(
+      16023,
+      "StreamExecutionEnvironment.getConfiguration() execute failed!(StreamExecutionEnvironment.getConfiguration() 执行失败!)",
+      "StreamExecutionEnvironment.getConfiguration() execute failed!(StreamExecutionEnvironment.getConfiguration() 执行失败!)"),
+  JOB_EXECUTION_ID(16023, "", ""),
+  NOT_SUPPORT_FLINK(
+      20001,
+      "Not support ClusterDescriptorAdapter for flink application.(不支持 flink 应用的 ClusterDescriptorAdapter.)",
+      "Not support ClusterDescriptorAdapter for flink application.(不支持 flink 应用的 ClusterDescriptorAdapter.)"),
+  YARN_IS_NULL(
+      20001,
+      "The application start failed, since yarn applicationId is null.(应用程序启动失败,因为 yarn applicationId 为 null.)",
+      "The application start failed, since yarn applicationId is null.(应用程序启动失败,因为 yarn applicationId 为 null.)"),
+  NOT_SUPPORT_SIMPLENAME(
+      20001,
+      "Not support {} for FlinkSQLComputationExecutor.(不支持 FlinkSQLComputationExecutor 的 {}.)",
+      "Not support {} for FlinkSQLComputationExecutor.(不支持 FlinkSQLComputationExecutor 的 {}.)"),
+  ADAPTER_IS_NULL(
+      20001,
+      "Fatal error, ClusterDescriptorAdapter is null, please ask admin for help.(致命错误,ClusterDescriptorAdapter 为空,请向管理员寻求帮助.)",
+      "Fatal error, ClusterDescriptorAdapter is null, please ask admin for help.(致命错误,ClusterDescriptorAdapter 为空,请向管理员寻求帮助.)"),
+  EXECUTORINIT_ID(20001, "", "");
+
+  /** (errorCode)错误码 */
+  private int errorCode;
+  /** (errorDesc)错误描述 */
+  private String errorDesc;
+  /** Possible reasons for the error(错误可能出现的原因) */
+  private String comment;
+
+  FlinkErrorCodeSummary(int errorCode, String errorDesc, String comment) {
+    this.errorCode = errorCode;
+    this.errorDesc = errorDesc;
+    this.comment = comment;
+  }
+
+  public int getErrorCode() {
+    return errorCode;
+  }
+
+  public void setErrorCode(int errorCode) {
+    this.errorCode = errorCode;
+  }
+
+  public String getErrorDesc() {
+    return errorDesc;
+  }
+
+  public void setErrorDesc(String errorDesc) {
+    this.errorDesc = errorDesc;
+  }
+
+  public String getComment() {
+    return comment;
+  }
+
+  public void setComment(String comment) {
+    this.comment = comment;
+  }
+
+  @Override
+  public String toString() {
+    return "errorCode: " + this.errorCode + ", errorDesc:" + this.errorDesc;
+  }
+}
diff --git a/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/exception/ExecutorInitException.java b/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/exception/ExecutorInitException.java
index be32f7570..efd27593a 100644
--- a/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/exception/ExecutorInitException.java
+++ b/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/exception/ExecutorInitException.java
@@ -19,6 +19,8 @@ package org.apache.linkis.engineconnplugin.flink.exception;
 
 import org.apache.linkis.common.exception.ErrorException;
 
+import static org.apache.linkis.engineconnplugin.flink.errorcode.FlinkErrorCodeSummary.EXECUTORINIT_ID;
+
 public class ExecutorInitException extends ErrorException {
 
   public static final int ERROR_CODE = 16021;
@@ -30,14 +32,14 @@ public class ExecutorInitException extends ErrorException {
   }
 
   public ExecutorInitException(String desc) {
-    super(20001, desc);
+    super(EXECUTORINIT_ID.getErrorCode(), desc);
   }
 
   public ExecutorInitException(Exception e) {
-    super(20001, e.getMessage());
+    super(EXECUTORINIT_ID.getErrorCode(), e.getMessage());
   }
 
   public ExecutorInitException() {
-    super(20001, "argument illegal");
+    super(EXECUTORINIT_ID.getErrorCode(), "argument illegal");
   }
 }
diff --git a/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/exception/FlinkInitFailedException.java b/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/exception/FlinkInitFailedException.java
index 2f3fd43c2..1ba1057f1 100644
--- a/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/exception/FlinkInitFailedException.java
+++ b/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/exception/FlinkInitFailedException.java
@@ -19,18 +19,18 @@ package org.apache.linkis.engineconnplugin.flink.exception;
 
 import org.apache.linkis.common.exception.ErrorException;
 
-public class FlinkInitFailedException extends ErrorException {
+import static org.apache.linkis.engineconnplugin.flink.errorcode.FlinkErrorCodeSummary.FLINK_INIT_EXCEPTION_ID;
 
-  public static final int ERROR_CODE = 16020;
+public class FlinkInitFailedException extends ErrorException {
 
   private static final long serialVersionUID = 1L;
 
   public FlinkInitFailedException(String msg) {
-    super(ERROR_CODE, msg);
+    super(FLINK_INIT_EXCEPTION_ID.getErrorCode(), msg);
   }
 
   public FlinkInitFailedException(String msg, Throwable cause) {
-    super(ERROR_CODE, msg);
+    super(FLINK_INIT_EXCEPTION_ID.getErrorCode(), msg);
     initCause(cause);
   }
 }
diff --git a/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/exception/JobExecutionException.java b/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/exception/JobExecutionException.java
index b8f05f30e..84bfedcf6 100644
--- a/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/exception/JobExecutionException.java
+++ b/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/exception/JobExecutionException.java
@@ -19,18 +19,18 @@ package org.apache.linkis.engineconnplugin.flink.exception;
 
 import org.apache.linkis.common.exception.ErrorException;
 
+import static org.apache.linkis.engineconnplugin.flink.errorcode.FlinkErrorCodeSummary.JOB_EXECUTION_ID;
+
 public class JobExecutionException extends ErrorException {
 
   private static final long serialVersionUID = 1L;
 
-  public static final int ERROR_CODE = 16023;
-
   public JobExecutionException(String message) {
-    super(ERROR_CODE, message);
+    super(JOB_EXECUTION_ID.getErrorCode(), message);
   }
 
   public JobExecutionException(String message, Throwable e) {
-    super(ERROR_CODE, message);
+    super(JOB_EXECUTION_ID.getErrorCode(), message);
     this.initCause(e);
   }
 }
diff --git a/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/exception/SqlExecutionException.java b/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/exception/SqlExecutionException.java
index 6967612f0..8602071d8 100644
--- a/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/exception/SqlExecutionException.java
+++ b/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/exception/SqlExecutionException.java
@@ -19,18 +19,18 @@ package org.apache.linkis.engineconnplugin.flink.exception;
 
 import org.apache.linkis.common.exception.ErrorException;
 
-public class SqlExecutionException extends ErrorException {
+import static org.apache.linkis.engineconnplugin.flink.errorcode.FlinkErrorCodeSummary.SQL_EXECUTION_ID;
 
-  public static final int ERROR_CODE = 16022;
+public class SqlExecutionException extends ErrorException {
 
   private static final long serialVersionUID = 1L;
 
   public SqlExecutionException(String message) {
-    super(ERROR_CODE, message);
+    super(SQL_EXECUTION_ID.getErrorCode(), message);
   }
 
   public SqlExecutionException(String message, Throwable e) {
-    super(ERROR_CODE, message);
+    super(SQL_EXECUTION_ID.getErrorCode(), message);
     initCause(e);
   }
 }
diff --git a/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/exception/SqlParseException.java b/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/exception/SqlParseException.java
index 00708da04..607055e3f 100644
--- a/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/exception/SqlParseException.java
+++ b/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/exception/SqlParseException.java
@@ -19,18 +19,18 @@ package org.apache.linkis.engineconnplugin.flink.exception;
 
 import org.apache.linkis.common.exception.ErrorException;
 
-public class SqlParseException extends ErrorException {
+import static org.apache.linkis.engineconnplugin.flink.errorcode.FlinkErrorCodeSummary.SQL_PARSE_ID;
 
-  public static final int ERROR_CODE = 16021;
+public class SqlParseException extends ErrorException {
 
   private static final long serialVersionUID = 1L;
 
   public SqlParseException(String message) {
-    super(ERROR_CODE, message);
+    super(SQL_PARSE_ID.getErrorCode(), message);
   }
 
   public SqlParseException(String message, Throwable e) {
-    super(ERROR_CODE, message);
+    super(SQL_PARSE_ID.getErrorCode(), message);
     this.initCause(e);
   }
 }
diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkCodeOnceExecutor.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkCodeOnceExecutor.scala
index 9db949a08..9b067a0da 100644
--- a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkCodeOnceExecutor.scala
+++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkCodeOnceExecutor.scala
@@ -24,6 +24,7 @@ import org.apache.linkis.engineconnplugin.flink.client.sql.operation.OperationFa
 import org.apache.linkis.engineconnplugin.flink.client.sql.operation.result.ResultKind.SUCCESS_WITH_CONTENT
 import org.apache.linkis.engineconnplugin.flink.client.sql.parser.{SqlCommand, SqlCommandParser}
 import org.apache.linkis.engineconnplugin.flink.context.FlinkEngineConnContext
+import org.apache.linkis.engineconnplugin.flink.errorcode.FlinkErrorCodeSummary._
 import org.apache.linkis.engineconnplugin.flink.exception.{
   FlinkInitFailedException,
   SqlParseException
@@ -60,7 +61,7 @@ class FlinkCodeOnceExecutor(
     options(TaskConstant.RUNTYPE) match {
       case "sql" =>
         if (StringUtils.isBlank(codes)) {
-          throw new FlinkInitFailedException(s"The sql code is empty.")
+          throw new FlinkInitFailedException(SQL_CODE_EMPTY.getErrorDesc)
         }
         logger.info(s"Ready to submit flink application, sql is: $codes.")
         val variableMap =
@@ -72,7 +73,7 @@ class FlinkCodeOnceExecutor(
         logger.info(s"After variable replace, sql is: $codes.")
       case runType =>
         // Now, only support sql code.
-        throw new FlinkInitFailedException(s"Not support runType $runType.")
+        throw new FlinkInitFailedException(NOT_SUPPORT_RUNTYPE.getErrorDesc + s" $runType.")
     }
     future = Utils.defaultScheduler.submit(new Runnable {
       override def run(): Unit = {
@@ -109,7 +110,7 @@ class FlinkCodeOnceExecutor(
     logger.info(s"$getId >> " + trimmedCode)
     val startTime = System.currentTimeMillis
     val callOpt = SqlCommandParser.getSqlCommandParser.parse(code.trim, true)
-    if (!callOpt.isPresent) throw new SqlParseException("Unknown statement: " + code)
+    if (!callOpt.isPresent) throw new SqlParseException(UNKNOWN_STATEMENT.getErrorDesc + code)
     val resultSet = callOpt.get().command match {
       case SqlCommand.SET | SqlCommand.USE_CATALOG | SqlCommand.USE | SqlCommand.SHOW_MODULES |
           SqlCommand.DESCRIBE_TABLE | SqlCommand.EXPLAIN =>
diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkExecutor.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkExecutor.scala
index ff2eaafae..fcf973295 100644
--- a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkExecutor.scala
+++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkExecutor.scala
@@ -26,6 +26,7 @@ import org.apache.linkis.engineconnplugin.flink.client.sql.operation.result.Resu
 import org.apache.linkis.engineconnplugin.flink.config.FlinkResourceConfiguration
 import org.apache.linkis.engineconnplugin.flink.config.FlinkResourceConfiguration.LINKIS_FLINK_CLIENT_CORES
 import org.apache.linkis.engineconnplugin.flink.context.FlinkEngineConnContext
+import org.apache.linkis.engineconnplugin.flink.errorcode.FlinkErrorCodeSummary._
 import org.apache.linkis.engineconnplugin.flink.exception.JobExecutionException
 import org.apache.linkis.engineconnplugin.flink.util.FlinkValueFormatUtil
 import org.apache.linkis.manager.common.entity.resource._
@@ -75,7 +76,7 @@ trait FlinkExecutor extends YarnExecutor with LabelExecutor with ResourceExecuto
   override def setExecutorLabels(labels: util.List[Label[_]]): Unit = this.executorLabels = labels
 
   override def requestExpectedResource(expectedResource: NodeResource): NodeResource =
-    throw new JobExecutionException("Not support method for requestExpectedResource.")
+    throw new JobExecutionException(NOT_SUPPORT_METHOD.getErrorDesc)
 
   protected val flinkEngineConnContext: FlinkEngineConnContext
 
diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkOnceExecutor.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkOnceExecutor.scala
index 4ae0aff44..411f574e7 100644
--- a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkOnceExecutor.scala
+++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkOnceExecutor.scala
@@ -30,6 +30,7 @@ import org.apache.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration.{
   FLINK_ONCE_APP_STATUS_FETCH_FAILED_MAX,
   FLINK_ONCE_APP_STATUS_FETCH_INTERVAL
 }
+import org.apache.linkis.engineconnplugin.flink.errorcode.FlinkErrorCodeSummary._
 import org.apache.linkis.engineconnplugin.flink.exception.ExecutorInitException
 import org.apache.linkis.manager.common.entity.enumeration.NodeStatus
 
@@ -50,9 +51,7 @@ trait FlinkOnceExecutor[T <: ClusterDescriptorAdapter]
     ClusterDescriptorAdapterFactory.create(flinkEngineConnContext.getExecutionContext) match {
       case adapter: T => clusterDescriptor = adapter
       case _ =>
-        throw new ExecutorInitException(
-          "Not support ClusterDescriptorAdapter for flink application."
-        )
+        throw new ExecutorInitException(NOT_SUPPORT_FLINK.getErrorDesc)
     }
     val options = onceExecutorExecutionContext.getOnceExecutorContent.getJobContent.map {
       case (k, v: String) => k -> v
@@ -62,9 +61,7 @@ trait FlinkOnceExecutor[T <: ClusterDescriptorAdapter]
     doSubmit(onceExecutorExecutionContext, options)
     if (isCompleted) return
     if (null == clusterDescriptor.getClusterID) {
-      throw new ExecutorInitException(
-        "The application start failed, since yarn applicationId is null."
-      )
+      throw new ExecutorInitException(YARN_IS_NULL.getErrorDesc)
     }
     setApplicationId(clusterDescriptor.getClusterID.toString)
     setApplicationURL(clusterDescriptor.getWebInterfaceUrl)
diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkSQLComputationExecutor.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkSQLComputationExecutor.scala
index f701d9f46..5fea30749 100644
--- a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkSQLComputationExecutor.scala
+++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkSQLComputationExecutor.scala
@@ -36,6 +36,7 @@ import org.apache.linkis.engineconnplugin.flink.client.sql.operation.result.Resu
 import org.apache.linkis.engineconnplugin.flink.client.sql.parser.SqlCommandParser
 import org.apache.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration
 import org.apache.linkis.engineconnplugin.flink.context.FlinkEngineConnContext
+import org.apache.linkis.engineconnplugin.flink.errorcode.FlinkErrorCodeSummary._
 import org.apache.linkis.engineconnplugin.flink.exception.{ExecutorInitException, SqlParseException}
 import org.apache.linkis.engineconnplugin.flink.listener.{
   FlinkStreamingResultSetListener,
@@ -81,9 +82,7 @@ class FlinkSQLComputationExecutor(
           s"Not support ${adapter.getClass.getSimpleName} for FlinkSQLComputationExecutor."
         )
       case _ =>
-        throw new ExecutorInitException(
-          "Fatal error, ClusterDescriptorAdapter is null, please ask admin for help."
-        )
+        throw new ExecutorInitException(ADAPTER_IS_NULL.getErrorDesc)
     }
     logger.info("Try to start a yarn-session application for interactive query.")
     clusterDescriptor.deployCluster()
@@ -104,7 +103,7 @@ class FlinkSQLComputationExecutor(
   ): ExecuteResponse = {
     val callOpt = SqlCommandParser.getSqlCommandParser.parse(code.trim, true)
     val callSQL =
-      if (!callOpt.isPresent) throw new SqlParseException("Unknown statement: " + code)
+      if (!callOpt.isPresent) throw new SqlParseException(UNKNOWN_STATEMENT.getErrorDesc + code)
       else callOpt.get
     RelMetadataQueryBase.THREAD_PROVIDERS.set(
       JaninoRelMetadataProvider.of(FlinkDefaultRelMetadataProvider.INSTANCE)
diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala
index 5867204e3..9f6d34354 100644
--- a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala
+++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala
@@ -26,6 +26,7 @@ import org.apache.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration
 import org.apache.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration._
 import org.apache.linkis.engineconnplugin.flink.config.FlinkResourceConfiguration._
 import org.apache.linkis.engineconnplugin.flink.context.{EnvironmentContext, FlinkEngineConnContext}
+import org.apache.linkis.engineconnplugin.flink.errorcode.FlinkErrorCodeSummary._
 import org.apache.linkis.engineconnplugin.flink.exception.FlinkInitFailedException
 import org.apache.linkis.engineconnplugin.flink.setting.Settings
 import org.apache.linkis.engineconnplugin.flink.util.ClassUtil
@@ -296,7 +297,7 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging
         val planner = FlinkEnvConfiguration.FLINK_SQL_PLANNER.getValue(options)
         if (!ExecutionEntry.AVAILABLE_PLANNERS.contains(planner.toLowerCase(Locale.getDefault))) {
           throw new FlinkInitFailedException(
-            "Planner must be one of these: " + String
+            PLANNER_MUST_THESE.getErrorDesc + String
               .join(", ", ExecutionEntry.AVAILABLE_PLANNERS)
           )
         }
@@ -307,7 +308,7 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging
             )
         ) {
           throw new FlinkInitFailedException(
-            "Execution type must be one of these: " + String
+            EXECUTION_MUST_THESE.getErrorDesc + String
               .join(", ", ExecutionEntry.AVAILABLE_EXECUTION_TYPES)
           )
         }
@@ -337,7 +338,9 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging
       case YarnDeploymentTarget.APPLICATION => null
       case t =>
         logger.error(s"Not supported YarnDeploymentTarget ${t.getName}.")
-        throw new FlinkInitFailedException(s"Not supported YarnDeploymentTarget ${t.getName}.")
+        throw new FlinkInitFailedException(
+          NOT_SUPPORTED_YARNTARGET.getErrorDesc + s" ${t.getName}."
+        )
     }
     val executionContext = ExecutionContext
       .builder(
@@ -363,7 +366,10 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging
           checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
         case "AT_LEAST_ONCE" =>
           checkpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
-        case _ => throw new FlinkInitFailedException(s"Unknown checkpoint mode $checkpointMode.")
+        case _ =>
+          throw new FlinkInitFailedException(
+            UNKNOWN_CHECKPOINT_MODE.getErrorDesc + s" $checkpointMode."
+          )
       }
       checkpointConfig.setCheckpointTimeout(checkpointTimeout)
       checkpointConfig.setMinPauseBetweenCheckpoints(checkpointMinPause)
diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/operator/TriggerSavepointOperator.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/operator/TriggerSavepointOperator.scala
index 87e24d6cb..5c8f15cb4 100644
--- a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/operator/TriggerSavepointOperator.scala
+++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/operator/TriggerSavepointOperator.scala
@@ -19,6 +19,7 @@ package org.apache.linkis.engineconnplugin.flink.operator
 
 import org.apache.linkis.common.utils.Logging
 import org.apache.linkis.engineconn.once.executor.creation.OnceExecutorManager
+import org.apache.linkis.engineconnplugin.flink.errorcode.FlinkErrorCodeSummary._
 import org.apache.linkis.engineconnplugin.flink.exception.JobExecutionException
 import org.apache.linkis.engineconnplugin.flink.executor.FlinkOnceExecutor
 import org.apache.linkis.manager.common.operator.Operator
@@ -38,7 +39,7 @@ class TriggerSavepointOperator extends Operator with Logging {
         Map("writtenSavepoint" -> writtenSavepoint)
       case executor =>
         throw new JobExecutionException(
-          "Not support to do savepoint for " + executor.getClass.getSimpleName
+          NOT_SUPPORT_SAVEPOTION.getErrorDesc + executor.getClass.getSimpleName
         )
     }
   }
diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/ql/GrammarFactory.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/ql/GrammarFactory.scala
index 46b5e0924..0e1c8ff57 100644
--- a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/ql/GrammarFactory.scala
+++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/ql/GrammarFactory.scala
@@ -19,6 +19,7 @@ package org.apache.linkis.engineconnplugin.flink.ql
 
 import org.apache.linkis.common.utils.ClassUtils
 import org.apache.linkis.engineconnplugin.flink.context.FlinkEngineConnContext
+import org.apache.linkis.engineconnplugin.flink.errorcode.FlinkErrorCodeSummary._
 import org.apache.linkis.engineconnplugin.flink.exception.SqlExecutionException
 
 import scala.collection.convert.WrapAsScala._
@@ -34,7 +35,7 @@ object GrammarFactory {
   def getGrammars: Array[Grammar] = grammars
 
   def apply(sql: String, context: FlinkEngineConnContext): Grammar = getGrammar(sql, context)
-    .getOrElse(throw new SqlExecutionException("Not support grammar " + sql))
+    .getOrElse(throw new SqlExecutionException(NOT_SUPPORT_GRAMMAR.getErrorDesc + sql))
 
   def getGrammar(sql: String, context: FlinkEngineConnContext): Option[Grammar] =
     grammars.find(_.canParse(sql)).map(_.copy(context, sql))
diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/setting/HudiSettings.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/setting/HudiSettings.scala
index 3ab96e707..faa16c527 100644
--- a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/setting/HudiSettings.scala
+++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/setting/HudiSettings.scala
@@ -22,6 +22,7 @@ import org.apache.linkis.common.utils.Logging
 import org.apache.linkis.engineconn.common.conf.EngineConnConf
 import org.apache.linkis.engineconn.common.creation.EngineCreationContext
 import org.apache.linkis.engineconnplugin.flink.context.{EnvironmentContext, FlinkEngineConnContext}
+import org.apache.linkis.engineconnplugin.flink.errorcode.FlinkErrorCodeSummary._
 import org.apache.linkis.engineconnplugin.flink.exception.FlinkInitFailedException
 
 import org.apache.commons.collections.CollectionUtils
@@ -65,7 +66,7 @@ class HudiSettings extends Settings with Logging {
     val hudiJarPaths =
       Lists.newArrayList(HudiSettings.getHudiJarPaths(engineCreationContext.getOptions): _*)
     if (CollectionUtils.isEmpty(hudiJarPaths)) {
-      throw new FlinkInitFailedException(s"hudi jars is not exists.")
+      throw new FlinkInitFailedException(HUDIJARS_NOT_EXISTS.getErrorDesc)
     }
     logger.info(s"hudi jar is in $hudiJarPaths.")
     context.getDependencies.addAll(
@@ -141,7 +142,7 @@ object HudiSettings {
     }
     val lib = new File(EngineConnConf.getWorkHome, "lib")
     if (!lib.exists() || !lib.isDirectory) {
-      throw new FlinkInitFailedException(s"Path $lib is not exist or is not a directory.")
+      throw new FlinkInitFailedException(PATH_NOT_EXIST.getErrorDesc + s"$lib")
     }
     lib
       .listFiles()
diff --git a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/util/ClassUtil.scala b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/util/ClassUtil.scala
index 3d7c17974..8d11c4c76 100644
--- a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/util/ClassUtil.scala
+++ b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/util/ClassUtil.scala
@@ -18,6 +18,7 @@
 package org.apache.linkis.engineconnplugin.flink.util
 
 import org.apache.linkis.common.utils.{ClassUtils, Utils}
+import org.apache.linkis.engineconnplugin.flink.errorcode.FlinkErrorCodeSummary._
 import org.apache.linkis.engineconnplugin.flink.exception.JobExecutionException
 
 import scala.collection.convert.wrapAsScala._
@@ -33,7 +34,10 @@ object ClassUtil {
     else if (classes.length == 2) {
       val realClass = if (classes(0) == defaultValue.getClass) classes(1) else classes(0);
       Utils.tryThrow(realClass.newInstance) { t =>
-        new JobExecutionException(s"New a instance of ${clazz.getSimpleName} failed!", t);
+        new JobExecutionException(
+          CREATE_INSTANCE_FAILURE.getErrorDesc + s"${clazz.getSimpleName}",
+          t
+        );
       }
     } else {
       throw new JobExecutionException(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org