You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by te...@apache.org on 2020/03/10 09:15:37 UTC
[incubator-dolphinscheduler] branch refactor-worker updated: 1,encapsulate the parameters required by sqltask 2,SQLTask optimization (#2135)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/refactor-worker by this push:
new fd2c2ee 1,encapsulate the parameters required by sqltask 2,SQLTask optimization (#2135)
fd2c2ee is described below
commit fd2c2eeb32ee11a763b3ce0fa99e87907901c6b8
Author: qiaozhanwei <qi...@outlook.com>
AuthorDate: Tue Mar 10 17:15:27 2020 +0800
1,encapsulate the parameters required by sqltask 2,SQLTask optimization (#2135)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment
* ExecutorManager interface add generic type
* add TaskInstanceCacheManager receive Worker report result
* TaskInstance setExecutePath
* add TaskInstanceCacheManager to receive Worker Task result report
* TaskInstanceCacheManager add remove method
* add license
* add dispatcht task method
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* taskInstanceCache is null ,need load from db
* taskInstanceCache is null ,need load from db
* taskInstanceCache is null ,need load from db
* 1,worker TaskPros use TaskExecutionContext replase
2,Master kill Task , KillTaskProcessor modify
* worker remove db
* ShellTask modify
* master persistence processId and appIds
* master persistence processId and appIds
* master add kill task logic
* master add kill task logic
* master add kill task logic
* javadoc error modify
* remove chinese log
* executeDirectly method add Override
* remote module modify
* TaskKillResponseProcessor command type modify
* create buildKillCommand
* host add host:port format
* host add host:port format
* TaskAckProcessor modify
* TaskAckProcessor modify
* task prioriry refator
* remove ITaskQueue
* task prioriry refator
* remove ITaskQueue
* TaskPriority refactor
* remove logs
* WorkerServer refactor
* MasterSchedulerService modify
* WorkerConfig listen port modify
* modify master and worker listen port
* cancelTaskInstance set TaskExecutionContext host,logPath,executePath
* cancelTaskInstance set TaskExecutionContext host,logPath,executePath
* Encapsulate the parameters required by sqltask
* 1,Encapsulate the parameters required by sqltask
2,SQLTask optimization
* AbstractTask modify
Co-authored-by: qiaozhanwei <qi...@analysys.com.cn>
---
.../dolphinscheduler/common/enums/UdfType.java | 11 +
.../builder/TaskExecutionContextBuilder.java | 32 ++-
.../server/entity/SQLTaskExecutionContext.java | 14 +
.../master/consumer/TaskUpdateQueueConsumer.java | 49 +++-
.../master/runner/MasterBaseTaskExecThread.java | 2 +-
.../dolphinscheduler/server/utils/UDFUtils.java | 6 +
.../server/worker/WorkerServer.java | 2 +-
.../server/worker/task/AbstractTask.java | 6 +-
.../server/worker/task/shell/ShellTask.java | 4 +-
.../server/worker/task/sql/SqlTask.java | 309 +++++++++++++--------
.../service/process/ProcessService.java | 1 -
11 files changed, 303 insertions(+), 133 deletions(-)
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/UdfType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/UdfType.java
index 22f6752..2351cca 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/UdfType.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/UdfType.java
@@ -44,4 +44,15 @@ public enum UdfType {
public String getDescp() {
return descp;
}
+
+ public static UdfType of(int type){
+ for(UdfType ut : values()){
+ if(ut.getCode() == type){
+ return ut;
+ }
+ }
+ throw new IllegalArgumentException("invalid type : " + type);
+ }
+
+
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
index 34d96aa..e917dda 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
@@ -18,11 +18,13 @@
package org.apache.dolphinscheduler.server.builder;
import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
-import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.entity.*;
+import org.apache.dolphinscheduler.server.entity.DataxTaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
+import java.util.List;
+
/**
* TaskExecutionContext builder
*/
@@ -82,6 +84,30 @@ public class TaskExecutionContextBuilder {
return this;
}
+
+ /**
+ * build SQLTask related info
+ *
+ * @param sqlTaskExecutionContext sqlTaskExecutionContext
+ * @return TaskExecutionContextBuilder
+ */
+ public TaskExecutionContextBuilder buildSQLTaskRelatedInfo(SQLTaskExecutionContext sqlTaskExecutionContext){
+ taskExecutionContext.setSqlTaskExecutionContext(sqlTaskExecutionContext);
+ return this;
+ }
+
+
+ /**
+ * build DataxTask related info
+ * @param dataxTaskExecutionContext dataxTaskExecutionContext
+ * @return TaskExecutionContextBuilder
+ */
+ public TaskExecutionContextBuilder buildDataxTaskRelatedInfo(DataxTaskExecutionContext dataxTaskExecutionContext){
+ taskExecutionContext.setDataxTaskExecutionContext(dataxTaskExecutionContext);
+ return this;
+ }
+
+
/**
* create
*
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContext.java
index b1ec20d..97afb4f 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContext.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContext.java
@@ -32,6 +32,11 @@ public class SQLTaskExecutionContext implements Serializable {
* warningGroupId
*/
private int warningGroupId;
+
+ /**
+ * connectionParams
+ */
+ private String connectionParams;
/**
* udf function list
*/
@@ -54,10 +59,19 @@ public class SQLTaskExecutionContext implements Serializable {
this.udfFuncList = udfFuncList;
}
+ public String getConnectionParams() {
+ return connectionParams;
+ }
+
+ public void setConnectionParams(String connectionParams) {
+ this.connectionParams = connectionParams;
+ }
+
@Override
public String toString() {
return "SQLTaskExecutionContext{" +
"warningGroupId=" + warningGroupId +
+ ", connectionParams='" + connectionParams + '\'' +
", udfFuncList=" + udfFuncList +
'}';
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java
index e3957af..ce185a5 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java
@@ -17,13 +17,23 @@
package org.apache.dolphinscheduler.server.master.consumer;
+import com.alibaba.fastjson.JSONObject;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.enums.TaskType;
+import org.apache.dolphinscheduler.common.enums.UdfType;
+import org.apache.dolphinscheduler.common.model.TaskNode;
+import org.apache.dolphinscheduler.common.task.sql.SqlParameters;
import org.apache.dolphinscheduler.common.thread.Stopper;
+import org.apache.dolphinscheduler.common.utils.EnumUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
+import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
+import org.apache.dolphinscheduler.server.entity.DataxTaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskPriority;
import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
@@ -38,6 +48,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
+import java.util.List;
/**
* TaskUpdateQueue consumer
@@ -136,10 +147,45 @@ public class TaskUpdateQueueConsumer extends Thread{
taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode());
taskInstance.setExecutePath(getExecLocalPath(taskInstance));
+ SQLTaskExecutionContext sqlTaskExecutionContext = new SQLTaskExecutionContext();
+ DataxTaskExecutionContext dataxTaskExecutionContext = new DataxTaskExecutionContext();
+
+ TaskType taskType = TaskType.valueOf(taskInstance.getTaskType());
+ if (taskType == TaskType.SQL){
+ TaskNode taskNode = JSONObject.parseObject(taskInstance.getTaskJson(), TaskNode.class);
+ SqlParameters sqlParameters = JSONObject.parseObject(taskNode.getParams(), SqlParameters.class);
+ int datasourceId = sqlParameters.getDatasource();
+ DataSource datasource = processService.findDataSourceById(datasourceId);
+ sqlTaskExecutionContext.setConnectionParams(datasource.getConnectionParams());
+
+ // whether udf type
+ boolean udfTypeFlag = EnumUtils.isValidEnum(UdfType.class, sqlParameters.getType())
+ && StringUtils.isNotEmpty(sqlParameters.getUdfs());
+
+ if (udfTypeFlag){
+ String[] udfFunIds = sqlParameters.getUdfs().split(",");
+ int[] udfFunIdsArray = new int[udfFunIds.length];
+ for(int i = 0 ; i < udfFunIds.length;i++){
+ udfFunIdsArray[i]=Integer.parseInt(udfFunIds[i]);
+ }
+
+ List<UdfFunc> udfFuncList = processService.queryUdfFunListByids(udfFunIdsArray);
+ sqlTaskExecutionContext.setUdfFuncList(udfFuncList);
+ }
+
+ }
+
+ if (taskType == TaskType.DATAX){
+
+ }
+
+
return TaskExecutionContextBuilder.get()
.buildTaskInstanceRelatedInfo(taskInstance)
.buildProcessInstanceRelatedInfo(taskInstance.getProcessInstance())
.buildProcessDefinitionRelatedInfo(taskInstance.getProcessDefine())
+ .buildSQLTaskRelatedInfo(sqlTaskExecutionContext)
+ .buildDataxTaskRelatedInfo(dataxTaskExecutionContext)
.create();
}
@@ -171,7 +217,4 @@ public class TaskUpdateQueueConsumer extends Thread{
}
return false;
}
-
-
-
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
index b0fd632..bfbce4f 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
@@ -152,7 +152,7 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
/**
- * dispatcht task
+ * TODO dispatcht task
* @param taskInstance taskInstance
* @return whether submit task success
*/
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/UDFUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/UDFUtils.java
index 5e2e535..63efb24 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/UDFUtils.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/UDFUtils.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.utils;
import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
@@ -48,6 +49,11 @@ public class UDFUtils {
* @return create function list
*/
public static List<String> createFuncs(List<UdfFunc> udfFuncs, String tenantCode,Logger logger){
+
+ if (CollectionUtils.isEmpty(udfFuncs)){
+ logger.info("can't find udf function resource");
+ return null;
+ }
// get hive udf jar path
String hiveUdfJarPath = HadoopUtils.getHdfsUdfDir(tenantCode);
logger.info("hive udf jar path : {}" , hiveUdfJarPath);
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index e1872f7..2fadaf1 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -97,7 +97,7 @@ public class WorkerServer {
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, new TaskKillProcessor());
this.nettyRemotingServer.start();
- //
+ // worker registry
this.workerRegistry.registry();
/**
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
index 86aed54..3ea032f 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
@@ -59,7 +59,7 @@ public abstract class AbstractTask {
/**
* SHELL process pid
*/
- protected Integer processId;
+ protected int processId;
/**
* other resource manager appId , for example : YARN etc
@@ -139,11 +139,11 @@ public abstract class AbstractTask {
this.appIds = appIds;
}
- public Integer getProcessId() {
+ public int getProcessId() {
return processId;
}
- public void setProcessId(Integer processId) {
+ public void setProcessId(int processId) {
this.processId = processId;
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
index 9fa2abe..ff8f2e9 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
@@ -95,7 +95,7 @@ public class ShellTask extends AbstractTask {
setAppIds(commandExecuteResult.getAppIds());
setProcessId(commandExecuteResult.getProcessId());
} catch (Exception e) {
- logger.error("shell task failure", e);
+ logger.error("shell task error", e);
setExitStatusCode(Constants.EXIT_CODE_FAILURE);
throw e;
}
@@ -125,8 +125,6 @@ public class ShellTask extends AbstractTask {
}
String script = shellParameters.getRawScript().replaceAll("\\r\\n", "\n");
-
-
/**
* combining local and global parameters
*/
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
index 9a45c7d..afff825 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
@@ -105,16 +105,14 @@ public class SqlTask extends AbstractTask {
sqlParameters.getUdfs(),
sqlParameters.getShowType(),
sqlParameters.getConnParams());
-
- Connection con = null;
- List<String> createFuncs = null;
try {
+ SQLTaskExecutionContext sqlTaskExecutionContext = taskExecutionContext.getSqlTaskExecutionContext();
// load class
DataSourceFactory.loadClass(DbType.valueOf(sqlParameters.getType()));
// get datasource
baseDataSource = DataSourceFactory.getDatasource(DbType.valueOf(sqlParameters.getType()),
- sqlParameters.getConnParams());
+ sqlTaskExecutionContext.getConnectionParams());
// ready to execute SQL and parameter entity Map
SqlBinds mainSqlBinds = getSqlAndSqlParamsMap(sqlParameters.getSql());
@@ -129,32 +127,18 @@ public class SqlTask extends AbstractTask {
.map(this::getSqlAndSqlParamsMap)
.collect(Collectors.toList());
- // determine if it is UDF
- boolean udfTypeFlag = EnumUtils.isValidEnum(UdfType.class, sqlParameters.getType())
- && StringUtils.isNotEmpty(sqlParameters.getUdfs());
- if(udfTypeFlag){
- String[] ids = sqlParameters.getUdfs().split(",");
- int[] idsArray = new int[ids.length];
- for(int i=0;i<ids.length;i++){
- idsArray[i]=Integer.parseInt(ids[i]);
- }
- SQLTaskExecutionContext sqlTaskExecutionContext = taskExecutionContext.getSqlTaskExecutionContext();
- createFuncs = UDFUtils.createFuncs(sqlTaskExecutionContext.getUdfFuncList(), taskExecutionContext.getTenantCode(), logger);
- }
+ List<String> createFuncs = UDFUtils.createFuncs(sqlTaskExecutionContext.getUdfFuncList(),
+ taskExecutionContext.getTenantCode(),
+ logger);
// execute sql task
- con = executeFuncAndSql(mainSqlBinds, preStatementSqlBinds, postStatementSqlBinds, createFuncs);
+ executeFuncAndSql(mainSqlBinds, preStatementSqlBinds, postStatementSqlBinds, createFuncs);
+
+ setExitStatusCode(Constants.EXIT_CODE_SUCCESS);
} catch (Exception e) {
- logger.error(e.getMessage(), e);
+ setExitStatusCode(Constants.EXIT_CODE_FAILURE);
+ logger.error("sql task error", e);
throw e;
- } finally {
- if (con != null) {
- try {
- con.close();
- } catch (SQLException e) {
- logger.error(e.getMessage(),e);
- }
- }
}
}
@@ -193,11 +177,11 @@ public class SqlTask extends AbstractTask {
setSqlParamsMap(sql, rgex, sqlParamsMap, paramsMap);
// replace the ${} of the SQL statement with the Placeholder
- String formatSql = sql.replaceAll(rgex,"?");
+ String formatSql = sql.replaceAll(rgex, "?");
sqlBuilder.append(formatSql);
// print repalce sql
- printReplacedSql(sql,formatSql,rgex,sqlParamsMap);
+ printReplacedSql(sql, formatSql, rgex, sqlParamsMap);
return new SqlBinds(sqlBuilder.toString(), sqlParamsMap);
}
@@ -214,108 +198,198 @@ public class SqlTask extends AbstractTask {
* @param createFuncs create functions
* @return Connection
*/
- public Connection executeFuncAndSql(SqlBinds mainSqlBinds,
+ public void executeFuncAndSql(SqlBinds mainSqlBinds,
List<SqlBinds> preStatementsBinds,
List<SqlBinds> postStatementsBinds,
List<String> createFuncs){
Connection connection = null;
+ PreparedStatement stmt = null;
+ ResultSet resultSet = null;
try {
// if upload resource is HDFS and kerberos startup
CommonUtils.loadKerberosConf();
- // if hive , load connection params if exists
- if (HIVE == DbType.valueOf(sqlParameters.getType())) {
- Properties paramProp = new Properties();
- paramProp.setProperty(USER, baseDataSource.getUser());
- paramProp.setProperty(PASSWORD, baseDataSource.getPassword());
- Map<String, String> connParamMap = CollectionUtils.stringToMap(sqlParameters.getConnParams(),
- SEMICOLON,
- HIVE_CONF);
- paramProp.putAll(connParamMap);
-
- connection = DriverManager.getConnection(baseDataSource.getJdbcUrl(),
- paramProp);
- }else{
- connection = DriverManager.getConnection(baseDataSource.getJdbcUrl(),
- baseDataSource.getUser(),
- baseDataSource.getPassword());
- }
+
+ // create connection
+ connection = createConnection();
// create temp function
if (CollectionUtils.isNotEmpty(createFuncs)) {
- try (Statement funcStmt = connection.createStatement()) {
- for (String createFunc : createFuncs) {
- logger.info("hive create function sql: {}", createFunc);
- funcStmt.execute(createFunc);
- }
- }
+ createTempFunction(connection,createFuncs);
}
- for (SqlBinds sqlBind: preStatementsBinds) {
- try (PreparedStatement stmt = prepareStatementAndBind(connection, sqlBind)) {
- int result = stmt.executeUpdate();
- logger.info("pre statement execute result: {}, for sql: {}",result,sqlBind.getSql());
- }
- }
+ // pre sql
+ preSql(connection,preStatementsBinds);
- try (PreparedStatement stmt = prepareStatementAndBind(connection, mainSqlBinds);
- ResultSet resultSet = stmt.executeQuery()) {
- // decide whether to executeQuery or executeUpdate based on sqlType
- if (sqlParameters.getSqlType() == SqlType.QUERY.ordinal()) {
- // query statements need to be convert to JsonArray and inserted into Alert to send
- JSONArray resultJSONArray = new JSONArray();
- ResultSetMetaData md = resultSet.getMetaData();
- int num = md.getColumnCount();
-
- while (resultSet.next()) {
- JSONObject mapOfColValues = new JSONObject(true);
- for (int i = 1; i <= num; i++) {
- mapOfColValues.put(md.getColumnName(i), resultSet.getObject(i));
- }
- resultJSONArray.add(mapOfColValues);
- }
- logger.debug("execute sql : {}", JSONObject.toJSONString(resultJSONArray, SerializerFeature.WriteMapNullValue));
-
- // if there is a result set
- if ( !resultJSONArray.isEmpty() ) {
- if (StringUtils.isNotEmpty(sqlParameters.getTitle())) {
- sendAttachment(sqlParameters.getTitle(),
- JSONObject.toJSONString(resultJSONArray, SerializerFeature.WriteMapNullValue));
- }else{
- sendAttachment(taskExecutionContext.getTaskName() + " query resultsets ",
- JSONObject.toJSONString(resultJSONArray, SerializerFeature.WriteMapNullValue));
- }
- }
-
- exitStatusCode = 0;
-
- } else if (sqlParameters.getSqlType() == SqlType.NON_QUERY.ordinal()) {
- // non query statement
- stmt.executeUpdate();
- exitStatusCode = 0;
- }
- }
- for (SqlBinds sqlBind: postStatementsBinds) {
- try (PreparedStatement stmt = prepareStatementAndBind(connection, sqlBind)) {
- int result = stmt.executeUpdate();
- logger.info("post statement execute result: {},for sql: {}",result,sqlBind.getSql());
- }
+ stmt = prepareStatementAndBind(connection, mainSqlBinds);
+ resultSet = stmt.executeQuery();
+ // decide whether to executeQuery or executeUpdate based on sqlType
+ if (sqlParameters.getSqlType() == SqlType.QUERY.ordinal()) {
+ // query statements need to be convert to JsonArray and inserted into Alert to send
+ resultProcess(resultSet);
+
+ } else if (sqlParameters.getSqlType() == SqlType.NON_QUERY.ordinal()) {
+ // non query statement
+ stmt.executeUpdate();
}
+
+ postSql(connection,postStatementsBinds);
+
} catch (Exception e) {
- logger.error(e.getMessage(),e);
- throw new RuntimeException(e.getMessage());
+ logger.error("execute sql error",e);
+ throw new RuntimeException("execute sql error");
} finally {
- try {
- connection.close();
- } catch (Exception e) {
- logger.error(e.getMessage(), e);
+ close(resultSet,stmt,connection);
+ }
+ }
+
+ /**
+ * result process
+ *
+ * @param resultSet resultSet
+ * @throws Exception
+ */
+ private void resultProcess(ResultSet resultSet) throws Exception{
+ JSONArray resultJSONArray = new JSONArray();
+ ResultSetMetaData md = resultSet.getMetaData();
+ int num = md.getColumnCount();
+
+ while (resultSet.next()) {
+ JSONObject mapOfColValues = new JSONObject(true);
+ for (int i = 1; i <= num; i++) {
+ mapOfColValues.put(md.getColumnName(i), resultSet.getObject(i));
+ }
+ resultJSONArray.add(mapOfColValues);
+ }
+ logger.debug("execute sql : {}", JSONObject.toJSONString(resultJSONArray, SerializerFeature.WriteMapNullValue));
+
+ // if there is a result set
+ if (!resultJSONArray.isEmpty() ) {
+ if (StringUtils.isNotEmpty(sqlParameters.getTitle())) {
+ sendAttachment(sqlParameters.getTitle(),
+ JSONObject.toJSONString(resultJSONArray, SerializerFeature.WriteMapNullValue));
+ }else{
+ sendAttachment(taskExecutionContext.getTaskName() + " query resultsets ",
+ JSONObject.toJSONString(resultJSONArray, SerializerFeature.WriteMapNullValue));
+ }
+ }
+ }
+
+ /**
+ * pre sql
+ *
+ * @param connection connection
+ * @param preStatementsBinds preStatementsBinds
+ */
+ private void preSql(Connection connection,
+ List<SqlBinds> preStatementsBinds) throws Exception{
+ for (SqlBinds sqlBind: preStatementsBinds) {
+ try (PreparedStatement pstmt = prepareStatementAndBind(connection, sqlBind)){
+ int result = pstmt.executeUpdate();
+ logger.info("pre statement execute result: {}, for sql: {}",result,sqlBind.getSql());
+
+ }
+ }
+ }
+
+ /**
+ * post psql
+ *
+ * @param connection connection
+ * @param postStatementsBinds postStatementsBinds
+ * @throws Exception
+ */
+ private void postSql(Connection connection,
+ List<SqlBinds> postStatementsBinds) throws Exception{
+ for (SqlBinds sqlBind: postStatementsBinds) {
+ try (PreparedStatement pstmt = prepareStatementAndBind(connection, sqlBind)){
+ int result = pstmt.executeUpdate();
+ logger.info("post statement execute result: {},for sql: {}",result,sqlBind.getSql());
+ }
+ }
+ }
+ /**
+ * create temp function
+ *
+ * @param connection connection
+ * @param createFuncs createFuncs
+ * @throws Exception
+ */
+ private void createTempFunction(Connection connection,
+ List<String> createFuncs) throws Exception{
+ try (Statement funcStmt = connection.createStatement()) {
+ for (String createFunc : createFuncs) {
+ logger.info("hive create function sql: {}", createFunc);
+ funcStmt.execute(createFunc);
}
}
+ }
+ /**
+ * create connection
+ *
+ * @return connection
+ * @throws Exception
+ */
+ private Connection createConnection() throws Exception{
+ // if hive , load connection params if exists
+ Connection connection = null;
+ if (HIVE == DbType.valueOf(sqlParameters.getType())) {
+ Properties paramProp = new Properties();
+ paramProp.setProperty(USER, baseDataSource.getUser());
+ paramProp.setProperty(PASSWORD, baseDataSource.getPassword());
+ Map<String, String> connParamMap = CollectionUtils.stringToMap(sqlParameters.getConnParams(),
+ SEMICOLON,
+ HIVE_CONF);
+ paramProp.putAll(connParamMap);
+
+ connection = DriverManager.getConnection(baseDataSource.getJdbcUrl(),
+ paramProp);
+ }else{
+ connection = DriverManager.getConnection(baseDataSource.getJdbcUrl(),
+ baseDataSource.getUser(),
+ baseDataSource.getPassword());
+
+ }
return connection;
}
/**
+ * close jdbc resource
+ *
+ * @param resultSet resultSet
+ * @param pstmt pstmt
+ * @param connection connection
+ */
+ private void close(ResultSet resultSet,
+ PreparedStatement pstmt,
+ Connection connection){
+ if (resultSet != null){
+ try {
+ connection.close();
+ } catch (SQLException e) {
+
+ }
+ }
+
+ if (pstmt != null){
+ try {
+ connection.close();
+ } catch (SQLException e) {
+
+ }
+ }
+
+ if (connection != null){
+ try {
+ connection.close();
+ } catch (SQLException e) {
+
+ }
+ }
+ }
+
+ /**
* preparedStatement bind
* @param connection
* @param sqlBinds
@@ -326,20 +400,19 @@ public class SqlTask extends AbstractTask {
// is the timeout set
boolean timeoutFlag = TaskTimeoutStrategy.of(taskExecutionContext.getTaskTimeoutStrategy()) == TaskTimeoutStrategy.FAILED ||
TaskTimeoutStrategy.of(taskExecutionContext.getTaskTimeoutStrategy()) == TaskTimeoutStrategy.WARNFAILED;
- try (PreparedStatement stmt = connection.prepareStatement(sqlBinds.getSql())) {
- if(timeoutFlag){
- stmt.setQueryTimeout(taskExecutionContext.getTaskTimeout());
- }
- Map<Integer, Property> params = sqlBinds.getParamsMap();
- if(params != null) {
- for (Map.Entry<Integer, Property> entry : params.entrySet()) {
- Property prop = entry.getValue();
- ParameterUtils.setInParameter(entry.getKey(), stmt, prop.getType(), prop.getValue());
- }
+ PreparedStatement stmt = connection.prepareStatement(sqlBinds.getSql());
+ if(timeoutFlag){
+ stmt.setQueryTimeout(taskExecutionContext.getTaskTimeout());
+ }
+ Map<Integer, Property> params = sqlBinds.getParamsMap();
+ if(params != null) {
+ for (Map.Entry<Integer, Property> entry : params.entrySet()) {
+ Property prop = entry.getValue();
+ ParameterUtils.setInParameter(entry.getKey(), stmt, prop.getType(), prop.getValue());
}
- logger.info("prepare statement replace sql : {} ", stmt);
- return stmt;
}
+ logger.info("prepare statement replace sql : {} ", stmt);
+ return stmt;
}
/**
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 81c523c..5c4d0ba 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -1513,7 +1513,6 @@ public class ProcessService {
* @return udf function list
*/
public List<UdfFunc> queryUdfFunListByids(int[] ids){
-
return udfFuncMapper.queryUdfByIdStr(ids, null);
}