You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by te...@apache.org on 2020/03/10 09:15:37 UTC

[incubator-dolphinscheduler] branch refactor-worker updated: 1,encapsulate the parameters required by sqltask 2,SQLTask optimization (#2135)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/refactor-worker by this push:
     new fd2c2ee  1,encapsulate the parameters required by sqltask 2,SQLTask optimization (#2135)
fd2c2ee is described below

commit fd2c2eeb32ee11a763b3ce0fa99e87907901c6b8
Author: qiaozhanwei <qi...@outlook.com>
AuthorDate: Tue Mar 10 17:15:27 2020 +0800

    1,encapsulate the parameters required by sqltask 2,SQLTask optimization (#2135)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify
    
    * add comment
    
    * ExecutorManager interface add generic type
    
    * add TaskInstanceCacheManager receive Worker report result
    
    * TaskInstance setExecutePath
    
    * add TaskInstanceCacheManager to receive Worker Task result report
    
    * TaskInstanceCacheManager add remove method
    
    * add license
    
    * add dispatcht task method
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * taskInstanceCache is null ,need load from db
    
    * taskInstanceCache is null ,need load from db
    
    * taskInstanceCache is null ,need load from db
    
    * 1,worker TaskPros use TaskExecutionContext replase
    2,Master kill Task , KillTaskProcessor modify
    
    * worker remove db
    
    * ShellTask modify
    
    * master persistence processId and appIds
    
    * master persistence processId and appIds
    
    * master add kill task logic
    
    * master add kill task logic
    
    * master add kill task logic
    
    * javadoc error modify
    
    * remove chinese log
    
    * executeDirectly method add Override
    
    * remote module modify
    
    * TaskKillResponseProcessor command type modify
    
    * create buildKillCommand
    
    * host add host:port format
    
    * host add host:port format
    
    * TaskAckProcessor modify
    
    * TaskAckProcessor modify
    
    * task prioriry refator
    
    * remove ITaskQueue
    
    * task prioriry refator
    
    * remove ITaskQueue
    
    * TaskPriority refactor
    
    * remove logs
    
    * WorkerServer refactor
    
    * MasterSchedulerService modify
    
    * WorkerConfig listen port modify
    
    * modify master and worker listen port
    
    * cancelTaskInstance set TaskExecutionContext host,logPath,executePath
    
    * cancelTaskInstance set TaskExecutionContext host,logPath,executePath
    
    * Encapsulate the parameters required by sqltask
    
    * 1,Encapsulate the parameters required by sqltask
    2,SQLTask optimization
    
    * AbstractTask modify
    
    Co-authored-by: qiaozhanwei <qi...@analysys.com.cn>
---
 .../dolphinscheduler/common/enums/UdfType.java     |  11 +
 .../builder/TaskExecutionContextBuilder.java       |  32 ++-
 .../server/entity/SQLTaskExecutionContext.java     |  14 +
 .../master/consumer/TaskUpdateQueueConsumer.java   |  49 +++-
 .../master/runner/MasterBaseTaskExecThread.java    |   2 +-
 .../dolphinscheduler/server/utils/UDFUtils.java    |   6 +
 .../server/worker/WorkerServer.java                |   2 +-
 .../server/worker/task/AbstractTask.java           |   6 +-
 .../server/worker/task/shell/ShellTask.java        |   4 +-
 .../server/worker/task/sql/SqlTask.java            | 309 +++++++++++++--------
 .../service/process/ProcessService.java            |   1 -
 11 files changed, 303 insertions(+), 133 deletions(-)

diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/UdfType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/UdfType.java
index 22f6752..2351cca 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/UdfType.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/UdfType.java
@@ -44,4 +44,15 @@ public enum UdfType {
     public String getDescp() {
         return descp;
     }
+
+    public static UdfType of(int type){
+        for(UdfType ut : values()){
+            if(ut.getCode() == type){
+                return ut;
+            }
+        }
+        throw new IllegalArgumentException("invalid type : " + type);
+    }
+
+
 }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
index 34d96aa..e917dda 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
@@ -18,11 +18,13 @@
 package org.apache.dolphinscheduler.server.builder;
 
 import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
-import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.entity.*;
+import org.apache.dolphinscheduler.server.entity.DataxTaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext;
 import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 
+import java.util.List;
+
 /**
  *  TaskExecutionContext builder
  */
@@ -82,6 +84,30 @@ public class TaskExecutionContextBuilder {
         return this;
     }
 
+
+    /**
+     * build SQLTask related info
+     *
+     * @param sqlTaskExecutionContext sqlTaskExecutionContext
+     * @return TaskExecutionContextBuilder
+     */
+    public TaskExecutionContextBuilder buildSQLTaskRelatedInfo(SQLTaskExecutionContext sqlTaskExecutionContext){
+        taskExecutionContext.setSqlTaskExecutionContext(sqlTaskExecutionContext);
+        return this;
+    }
+
+
+    /**
+     * build DataxTask related info
+     * @param dataxTaskExecutionContext dataxTaskExecutionContext
+     * @return TaskExecutionContextBuilder
+     */
+    public TaskExecutionContextBuilder buildDataxTaskRelatedInfo(DataxTaskExecutionContext dataxTaskExecutionContext){
+        taskExecutionContext.setDataxTaskExecutionContext(dataxTaskExecutionContext);
+        return this;
+    }
+
+
     /**
      * create
      *
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContext.java
index b1ec20d..97afb4f 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContext.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContext.java
@@ -32,6 +32,11 @@ public class SQLTaskExecutionContext implements Serializable {
      * warningGroupId
      */
     private int warningGroupId;
+
+    /**
+     * connectionParams
+     */
+    private String connectionParams;
     /**
      * udf function list
      */
@@ -54,10 +59,19 @@ public class SQLTaskExecutionContext implements Serializable {
         this.udfFuncList = udfFuncList;
     }
 
+    public String getConnectionParams() {
+        return connectionParams;
+    }
+
+    public void setConnectionParams(String connectionParams) {
+        this.connectionParams = connectionParams;
+    }
+
     @Override
     public String toString() {
         return "SQLTaskExecutionContext{" +
                 "warningGroupId=" + warningGroupId +
+                ", connectionParams='" + connectionParams + '\'' +
                 ", udfFuncList=" + udfFuncList +
                 '}';
     }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java
index e3957af..ce185a5 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java
@@ -17,13 +17,23 @@
 
 package org.apache.dolphinscheduler.server.master.consumer;
 
+import com.alibaba.fastjson.JSONObject;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.enums.TaskType;
+import org.apache.dolphinscheduler.common.enums.UdfType;
+import org.apache.dolphinscheduler.common.model.TaskNode;
+import org.apache.dolphinscheduler.common.task.sql.SqlParameters;
 import org.apache.dolphinscheduler.common.thread.Stopper;
+import org.apache.dolphinscheduler.common.utils.EnumUtils;
 import org.apache.dolphinscheduler.common.utils.FileUtils;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.dao.entity.DataSource;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.entity.Tenant;
+import org.apache.dolphinscheduler.dao.entity.UdfFunc;
 import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
+import org.apache.dolphinscheduler.server.entity.DataxTaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext;
 import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.entity.TaskPriority;
 import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
@@ -38,6 +48,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.PostConstruct;
+import java.util.List;
 
 /**
  * TaskUpdateQueue consumer
@@ -136,10 +147,45 @@ public class TaskUpdateQueueConsumer extends Thread{
         taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode());
         taskInstance.setExecutePath(getExecLocalPath(taskInstance));
 
+        SQLTaskExecutionContext sqlTaskExecutionContext = new SQLTaskExecutionContext();
+        DataxTaskExecutionContext dataxTaskExecutionContext = new DataxTaskExecutionContext();
+
+        TaskType taskType = TaskType.valueOf(taskInstance.getTaskType());
+        if (taskType == TaskType.SQL){
+            TaskNode taskNode = JSONObject.parseObject(taskInstance.getTaskJson(), TaskNode.class);
+            SqlParameters sqlParameters = JSONObject.parseObject(taskNode.getParams(), SqlParameters.class);
+            int datasourceId = sqlParameters.getDatasource();
+            DataSource datasource = processService.findDataSourceById(datasourceId);
+            sqlTaskExecutionContext.setConnectionParams(datasource.getConnectionParams());
+
+            // whether udf type
+            boolean udfTypeFlag = EnumUtils.isValidEnum(UdfType.class, sqlParameters.getType())
+                    && StringUtils.isNotEmpty(sqlParameters.getUdfs());
+
+            if (udfTypeFlag){
+                String[] udfFunIds = sqlParameters.getUdfs().split(",");
+                int[] udfFunIdsArray = new int[udfFunIds.length];
+                for(int i = 0 ; i < udfFunIds.length;i++){
+                    udfFunIdsArray[i]=Integer.parseInt(udfFunIds[i]);
+                }
+
+                List<UdfFunc> udfFuncList = processService.queryUdfFunListByids(udfFunIdsArray);
+                sqlTaskExecutionContext.setUdfFuncList(udfFuncList);
+            }
+
+        }
+
+        if (taskType == TaskType.DATAX){
+
+        }
+
+
         return TaskExecutionContextBuilder.get()
                 .buildTaskInstanceRelatedInfo(taskInstance)
                 .buildProcessInstanceRelatedInfo(taskInstance.getProcessInstance())
                 .buildProcessDefinitionRelatedInfo(taskInstance.getProcessDefine())
+                .buildSQLTaskRelatedInfo(sqlTaskExecutionContext)
+                .buildDataxTaskRelatedInfo(dataxTaskExecutionContext)
                 .create();
     }
 
@@ -171,7 +217,4 @@ public class TaskUpdateQueueConsumer extends Thread{
         }
         return false;
     }
-
-
-
 }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
index b0fd632..bfbce4f 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
@@ -152,7 +152,7 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
 
 
     /**
-     * dispatcht task
+     * TODO dispatcht task
      * @param taskInstance taskInstance
      * @return whether submit task success
      */
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/UDFUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/UDFUtils.java
index 5e2e535..63efb24 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/UDFUtils.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/UDFUtils.java
@@ -17,6 +17,7 @@
 package org.apache.dolphinscheduler.server.utils;
 
 import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.utils.CollectionUtils;
 import org.apache.dolphinscheduler.common.utils.HadoopUtils;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
 import org.apache.dolphinscheduler.dao.entity.UdfFunc;
@@ -48,6 +49,11 @@ public class UDFUtils {
      * @return create function list
      */
     public static List<String> createFuncs(List<UdfFunc> udfFuncs, String tenantCode,Logger logger){
+
+        if (CollectionUtils.isEmpty(udfFuncs)){
+            logger.info("can't find udf function resource");
+            return null;
+        }
         // get  hive udf jar path
         String hiveUdfJarPath = HadoopUtils.getHdfsUdfDir(tenantCode);
         logger.info("hive udf jar path : {}" , hiveUdfJarPath);
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index e1872f7..2fadaf1 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -97,7 +97,7 @@ public class WorkerServer {
         this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, new TaskKillProcessor());
         this.nettyRemotingServer.start();
 
-        //
+        // worker registry
         this.workerRegistry.registry();
 
         /**
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
index 86aed54..3ea032f 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
@@ -59,7 +59,7 @@ public abstract class AbstractTask {
     /**
      *  SHELL process pid
      */
-    protected Integer processId;
+    protected int processId;
 
     /**
      * other resource manager appId , for example : YARN etc
@@ -139,11 +139,11 @@ public abstract class AbstractTask {
         this.appIds = appIds;
     }
 
-    public Integer getProcessId() {
+    public int getProcessId() {
         return processId;
     }
 
-    public void setProcessId(Integer processId) {
+    public void setProcessId(int processId) {
         this.processId = processId;
     }
 
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
index 9fa2abe..ff8f2e9 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
@@ -95,7 +95,7 @@ public class ShellTask extends AbstractTask {
       setAppIds(commandExecuteResult.getAppIds());
       setProcessId(commandExecuteResult.getProcessId());
     } catch (Exception e) {
-      logger.error("shell task failure", e);
+      logger.error("shell task error", e);
       setExitStatusCode(Constants.EXIT_CODE_FAILURE);
       throw e;
     }
@@ -125,8 +125,6 @@ public class ShellTask extends AbstractTask {
     }
 
     String script = shellParameters.getRawScript().replaceAll("\\r\\n", "\n");
-
-
     /**
      *  combining local and global parameters
      */
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
index 9a45c7d..afff825 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
@@ -105,16 +105,14 @@ public class SqlTask extends AbstractTask {
                 sqlParameters.getUdfs(),
                 sqlParameters.getShowType(),
                 sqlParameters.getConnParams());
-
-        Connection con = null;
-        List<String> createFuncs = null;
         try {
+            SQLTaskExecutionContext sqlTaskExecutionContext = taskExecutionContext.getSqlTaskExecutionContext();
             // load class
             DataSourceFactory.loadClass(DbType.valueOf(sqlParameters.getType()));
 
             // get datasource
             baseDataSource = DataSourceFactory.getDatasource(DbType.valueOf(sqlParameters.getType()),
-                    sqlParameters.getConnParams());
+                    sqlTaskExecutionContext.getConnectionParams());
 
             // ready to execute SQL and parameter entity Map
             SqlBinds mainSqlBinds = getSqlAndSqlParamsMap(sqlParameters.getSql());
@@ -129,32 +127,18 @@ public class SqlTask extends AbstractTask {
                     .map(this::getSqlAndSqlParamsMap)
                     .collect(Collectors.toList());
 
-            // determine if it is UDF
-            boolean udfTypeFlag = EnumUtils.isValidEnum(UdfType.class, sqlParameters.getType())
-                    && StringUtils.isNotEmpty(sqlParameters.getUdfs());
-            if(udfTypeFlag){
-                String[] ids = sqlParameters.getUdfs().split(",");
-                int[] idsArray = new int[ids.length];
-                for(int i=0;i<ids.length;i++){
-                    idsArray[i]=Integer.parseInt(ids[i]);
-                }
-                SQLTaskExecutionContext sqlTaskExecutionContext = taskExecutionContext.getSqlTaskExecutionContext();
-                createFuncs = UDFUtils.createFuncs(sqlTaskExecutionContext.getUdfFuncList(), taskExecutionContext.getTenantCode(), logger);
-            }
+            List<String> createFuncs = UDFUtils.createFuncs(sqlTaskExecutionContext.getUdfFuncList(),
+                    taskExecutionContext.getTenantCode(),
+                    logger);
 
             // execute sql task
-            con = executeFuncAndSql(mainSqlBinds, preStatementSqlBinds, postStatementSqlBinds, createFuncs);
+            executeFuncAndSql(mainSqlBinds, preStatementSqlBinds, postStatementSqlBinds, createFuncs);
+
+            setExitStatusCode(Constants.EXIT_CODE_SUCCESS);
         } catch (Exception e) {
-            logger.error(e.getMessage(), e);
+            setExitStatusCode(Constants.EXIT_CODE_FAILURE);
+            logger.error("sql task error", e);
             throw e;
-        } finally {
-            if (con != null) {
-                try {
-                    con.close();
-                } catch (SQLException e) {
-                    logger.error(e.getMessage(),e);
-                }
-            }
         }
     }
 
@@ -193,11 +177,11 @@ public class SqlTask extends AbstractTask {
         setSqlParamsMap(sql, rgex, sqlParamsMap, paramsMap);
 
         // replace the ${} of the SQL statement with the Placeholder
-        String formatSql = sql.replaceAll(rgex,"?");
+        String formatSql = sql.replaceAll(rgex, "?");
         sqlBuilder.append(formatSql);
 
         // print repalce sql
-        printReplacedSql(sql,formatSql,rgex,sqlParamsMap);
+        printReplacedSql(sql, formatSql, rgex, sqlParamsMap);
         return new SqlBinds(sqlBuilder.toString(), sqlParamsMap);
     }
 
@@ -214,108 +198,198 @@ public class SqlTask extends AbstractTask {
      * @param createFuncs           create functions
      * @return Connection
      */
-    public Connection executeFuncAndSql(SqlBinds mainSqlBinds,
+    public void executeFuncAndSql(SqlBinds mainSqlBinds,
                                         List<SqlBinds> preStatementsBinds,
                                         List<SqlBinds> postStatementsBinds,
                                         List<String> createFuncs){
         Connection connection = null;
+        PreparedStatement stmt = null;
+        ResultSet resultSet = null;
         try {
             // if upload resource is HDFS and kerberos startup
             CommonUtils.loadKerberosConf();
 
-            // if hive , load connection params if exists
-            if (HIVE == DbType.valueOf(sqlParameters.getType())) {
-                Properties paramProp = new Properties();
-                paramProp.setProperty(USER, baseDataSource.getUser());
-                paramProp.setProperty(PASSWORD, baseDataSource.getPassword());
-                Map<String, String> connParamMap = CollectionUtils.stringToMap(sqlParameters.getConnParams(),
-                        SEMICOLON,
-                        HIVE_CONF);
-                paramProp.putAll(connParamMap);
-
-                connection = DriverManager.getConnection(baseDataSource.getJdbcUrl(),
-                        paramProp);
-            }else{
-                connection = DriverManager.getConnection(baseDataSource.getJdbcUrl(),
-                        baseDataSource.getUser(),
-                        baseDataSource.getPassword());
-            }
+
+            // create connection
+            connection = createConnection();
 
             // create temp function
             if (CollectionUtils.isNotEmpty(createFuncs)) {
-                try (Statement funcStmt = connection.createStatement()) {
-                    for (String createFunc : createFuncs) {
-                        logger.info("hive create function sql: {}", createFunc);
-                        funcStmt.execute(createFunc);
-                    }
-                }
+                createTempFunction(connection,createFuncs);
             }
 
-            for (SqlBinds sqlBind: preStatementsBinds) {
-                try (PreparedStatement stmt = prepareStatementAndBind(connection, sqlBind)) {
-                    int result = stmt.executeUpdate();
-                    logger.info("pre statement execute result: {}, for sql: {}",result,sqlBind.getSql());
-                }
-            }
+            // pre sql
+            preSql(connection,preStatementsBinds);
 
-            try (PreparedStatement  stmt = prepareStatementAndBind(connection, mainSqlBinds);
-                 ResultSet resultSet = stmt.executeQuery()) {
-                // decide whether to executeQuery or executeUpdate based on sqlType
-                if (sqlParameters.getSqlType() == SqlType.QUERY.ordinal()) {
-                    // query statements need to be convert to JsonArray and inserted into Alert to send
-                    JSONArray resultJSONArray = new JSONArray();
-                    ResultSetMetaData md = resultSet.getMetaData();
-                    int num = md.getColumnCount();
-
-                    while (resultSet.next()) {
-                        JSONObject mapOfColValues = new JSONObject(true);
-                        for (int i = 1; i <= num; i++) {
-                            mapOfColValues.put(md.getColumnName(i), resultSet.getObject(i));
-                        }
-                        resultJSONArray.add(mapOfColValues);
-                    }
-                    logger.debug("execute sql : {}", JSONObject.toJSONString(resultJSONArray, SerializerFeature.WriteMapNullValue));
-
-                    // if there is a result set
-                    if ( !resultJSONArray.isEmpty() ) {
-                        if (StringUtils.isNotEmpty(sqlParameters.getTitle())) {
-                            sendAttachment(sqlParameters.getTitle(),
-                                    JSONObject.toJSONString(resultJSONArray, SerializerFeature.WriteMapNullValue));
-                        }else{
-                            sendAttachment(taskExecutionContext.getTaskName() + " query resultsets ",
-                                    JSONObject.toJSONString(resultJSONArray, SerializerFeature.WriteMapNullValue));
-                        }
-                    }
-
-                    exitStatusCode = 0;
-
-                } else if (sqlParameters.getSqlType() == SqlType.NON_QUERY.ordinal()) {
-                    // non query statement
-                    stmt.executeUpdate();
-                    exitStatusCode = 0;
-                }
-            }
 
-            for (SqlBinds sqlBind: postStatementsBinds) {
-                try (PreparedStatement stmt = prepareStatementAndBind(connection, sqlBind)) {
-                    int result = stmt.executeUpdate();
-                    logger.info("post statement execute result: {},for sql: {}",result,sqlBind.getSql());
-                }
+            stmt = prepareStatementAndBind(connection, mainSqlBinds);
+            resultSet = stmt.executeQuery();
+            // decide whether to executeQuery or executeUpdate based on sqlType
+            if (sqlParameters.getSqlType() == SqlType.QUERY.ordinal()) {
+                // query statements need to be convert to JsonArray and inserted into Alert to send
+                resultProcess(resultSet);
+
+            } else if (sqlParameters.getSqlType() == SqlType.NON_QUERY.ordinal()) {
+                // non query statement
+                stmt.executeUpdate();
             }
+
+            postSql(connection,postStatementsBinds);
+
         } catch (Exception e) {
-            logger.error(e.getMessage(),e);
-            throw new RuntimeException(e.getMessage());
+            logger.error("execute sql error",e);
+            throw new RuntimeException("execute sql error");
         } finally {
-            try { 
-                connection.close(); 
-            } catch (Exception e) { 
-                logger.error(e.getMessage(), e); 
+            close(resultSet,stmt,connection);
+        }
+    }
+
+    /**
+     * result process
+     *
+     * @param resultSet resultSet
+     * @throws Exception
+     */
+    private void resultProcess(ResultSet resultSet) throws Exception{
+        JSONArray resultJSONArray = new JSONArray();
+        ResultSetMetaData md = resultSet.getMetaData();
+        int num = md.getColumnCount();
+
+        while (resultSet.next()) {
+            JSONObject mapOfColValues = new JSONObject(true);
+            for (int i = 1; i <= num; i++) {
+                mapOfColValues.put(md.getColumnName(i), resultSet.getObject(i));
+            }
+            resultJSONArray.add(mapOfColValues);
+        }
+        logger.debug("execute sql : {}", JSONObject.toJSONString(resultJSONArray, SerializerFeature.WriteMapNullValue));
+
+        // if there is a result set
+        if (!resultJSONArray.isEmpty() ) {
+            if (StringUtils.isNotEmpty(sqlParameters.getTitle())) {
+                sendAttachment(sqlParameters.getTitle(),
+                        JSONObject.toJSONString(resultJSONArray, SerializerFeature.WriteMapNullValue));
+            }else{
+                sendAttachment(taskExecutionContext.getTaskName() + " query resultsets ",
+                        JSONObject.toJSONString(resultJSONArray, SerializerFeature.WriteMapNullValue));
+            }
+        }
+    }
+
+    /**
+     *  pre sql
+     *
+     * @param connection connection
+     * @param preStatementsBinds preStatementsBinds
+     */
+    private void preSql(Connection connection,
+                        List<SqlBinds> preStatementsBinds) throws Exception{
+        for (SqlBinds sqlBind: preStatementsBinds) {
+            try (PreparedStatement pstmt = prepareStatementAndBind(connection, sqlBind)){
+                int result = pstmt.executeUpdate();
+                logger.info("pre statement execute result: {}, for sql: {}",result,sqlBind.getSql());
+
+            }
+        }
+    }
+
+    /**
+     * post psql
+     *
+     * @param connection connection
+     * @param postStatementsBinds postStatementsBinds
+     * @throws Exception
+     */
+    private void postSql(Connection connection,
+                         List<SqlBinds> postStatementsBinds) throws Exception{
+        for (SqlBinds sqlBind: postStatementsBinds) {
+            try (PreparedStatement pstmt = prepareStatementAndBind(connection, sqlBind)){
+                int result = pstmt.executeUpdate();
+                logger.info("post statement execute result: {},for sql: {}",result,sqlBind.getSql());
+            }
+        }
+    }
+    /**
+     * create temp function
+     *
+     * @param connection connection
+     * @param createFuncs createFuncs
+     * @throws Exception
+     */
+    private void createTempFunction(Connection connection,
+                                    List<String> createFuncs) throws Exception{
+        try (Statement funcStmt = connection.createStatement()) {
+            for (String createFunc : createFuncs) {
+                logger.info("hive create function sql: {}", createFunc);
+                funcStmt.execute(createFunc);
             }
         }
+    }
+    /**
+     * create connection
+     *
+     * @return connection
+     * @throws Exception
+     */
+    private Connection createConnection() throws Exception{
+        // if hive , load connection params if exists
+        Connection connection = null;
+        if (HIVE == DbType.valueOf(sqlParameters.getType())) {
+            Properties paramProp = new Properties();
+            paramProp.setProperty(USER, baseDataSource.getUser());
+            paramProp.setProperty(PASSWORD, baseDataSource.getPassword());
+            Map<String, String> connParamMap = CollectionUtils.stringToMap(sqlParameters.getConnParams(),
+                    SEMICOLON,
+                    HIVE_CONF);
+            paramProp.putAll(connParamMap);
+
+            connection = DriverManager.getConnection(baseDataSource.getJdbcUrl(),
+                    paramProp);
+        }else{
+            connection = DriverManager.getConnection(baseDataSource.getJdbcUrl(),
+                    baseDataSource.getUser(),
+                    baseDataSource.getPassword());
+
+        }
         return connection;
     }
 
     /**
+     *  close jdbc resource
+     *
+     * @param resultSet resultSet
+     * @param pstmt pstmt
+     * @param connection connection
+     */
+    private void close(ResultSet resultSet,
+                       PreparedStatement pstmt,
+                       Connection connection){
+        if (resultSet != null){
+            try {
+                connection.close();
+            } catch (SQLException e) {
+
+            }
+        }
+
+        if (pstmt != null){
+            try {
+                connection.close();
+            } catch (SQLException e) {
+
+            }
+        }
+
+        if (connection != null){
+            try {
+                connection.close();
+            } catch (SQLException e) {
+
+            }
+        }
+    }
+
+    /**
      * preparedStatement bind
      * @param connection
      * @param sqlBinds
@@ -326,20 +400,19 @@ public class SqlTask extends AbstractTask {
         // is the timeout set
         boolean timeoutFlag = TaskTimeoutStrategy.of(taskExecutionContext.getTaskTimeoutStrategy()) == TaskTimeoutStrategy.FAILED ||
                 TaskTimeoutStrategy.of(taskExecutionContext.getTaskTimeoutStrategy()) == TaskTimeoutStrategy.WARNFAILED;
-        try (PreparedStatement  stmt = connection.prepareStatement(sqlBinds.getSql())) {
-            if(timeoutFlag){
-                stmt.setQueryTimeout(taskExecutionContext.getTaskTimeout());
-            }
-            Map<Integer, Property> params = sqlBinds.getParamsMap();
-            if(params != null) {
-                for (Map.Entry<Integer, Property> entry : params.entrySet()) {
-                    Property prop = entry.getValue();
-                    ParameterUtils.setInParameter(entry.getKey(), stmt, prop.getType(), prop.getValue());
-                }
+        PreparedStatement stmt = connection.prepareStatement(sqlBinds.getSql());
+        if(timeoutFlag){
+            stmt.setQueryTimeout(taskExecutionContext.getTaskTimeout());
+        }
+        Map<Integer, Property> params = sqlBinds.getParamsMap();
+        if(params != null) {
+            for (Map.Entry<Integer, Property> entry : params.entrySet()) {
+                Property prop = entry.getValue();
+                ParameterUtils.setInParameter(entry.getKey(), stmt, prop.getType(), prop.getValue());
             }
-            logger.info("prepare statement replace sql : {} ", stmt);
-            return stmt;
         }
+        logger.info("prepare statement replace sql : {} ", stmt);
+        return stmt;
     }
 
     /**
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 81c523c..5c4d0ba 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -1513,7 +1513,6 @@ public class ProcessService {
      * @return udf function list
      */
     public List<UdfFunc> queryUdfFunListByids(int[] ids){
-
         return udfFuncMapper.queryUdfByIdStr(ids, null);
     }