You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by le...@apache.org on 2021/11/08 03:57:44 UTC
[dolphinscheduler] branch dev updated: [Upgrade][Install] fix
upgrade 2.0 bug (#6734)
This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new bdea8d6 [Upgrade][Install] fix upgrade 2.0 bug (#6734)
bdea8d6 is described below
commit bdea8d6ae4e35d64686d97f606b1e235f9664dbe
Author: JinYong Li <42...@users.noreply.github.com>
AuthorDate: Mon Nov 8 11:57:38 2021 +0800
[Upgrade][Install] fix upgrade 2.0 bug (#6734)
* add convert dependent/conditions
* fix upgrade 2.0 bug
* fix upgrade 2.0 bug
---
.../dolphinscheduler/dao/upgrade/JsonSplitDao.java | 36 +++++++++++-----------
.../dao/upgrade/ProcessDefinitionDao.java | 2 +-
.../dolphinscheduler/dao/upgrade/ScheduleDao.java | 2 +-
.../dolphinscheduler/dao/upgrade/UpgradeDao.java | 31 +++++++++++--------
.../mysql/dolphinscheduler_ddl_post.sql | 3 +-
5 files changed, 40 insertions(+), 34 deletions(-)
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/JsonSplitDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/JsonSplitDao.java
index 790d33a..46c7610 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/JsonSplitDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/JsonSplitDao.java
@@ -53,7 +53,7 @@ public class JsonSplitDao {
processUpdate.setInt(2, processDefinitionLog.getTimeout());
processUpdate.setInt(3, processDefinitionLog.getTenantId());
processUpdate.setString(4, processDefinitionLog.getLocations());
- processUpdate.setDate(5, (Date) processDefinitionLog.getUpdateTime());
+ processUpdate.setDate(5, new Date(processDefinitionLog.getUpdateTime().getTime()));
processUpdate.setInt(6, processDefinitionLog.getId());
processUpdate.addBatch();
@@ -70,9 +70,9 @@ public class JsonSplitDao {
insertLog.setInt(11, processDefinitionLog.getTimeout());
insertLog.setInt(12, processDefinitionLog.getTenantId());
insertLog.setInt(13, processDefinitionLog.getOperator());
- insertLog.setDate(14, (Date) processDefinitionLog.getOperateTime());
- insertLog.setDate(15, (Date) processDefinitionLog.getCreateTime());
- insertLog.setDate(16, (Date) processDefinitionLog.getUpdateTime());
+ insertLog.setDate(14, new Date(processDefinitionLog.getOperateTime().getTime()));
+ insertLog.setDate(15, new Date(processDefinitionLog.getCreateTime().getTime()));
+ insertLog.setDate(16, new Date(processDefinitionLog.getUpdateTime().getTime()));
insertLog.addBatch();
i++;
@@ -121,8 +121,8 @@ public class JsonSplitDao {
insert.setInt(7, processTaskRelationLog.getPostTaskVersion());
insert.setInt(8, processTaskRelationLog.getConditionType().getCode());
insert.setString(9, processTaskRelationLog.getConditionParams());
- insert.setDate(10, (Date) processTaskRelationLog.getCreateTime());
- insert.setDate(11, (Date) processTaskRelationLog.getUpdateTime());
+ insert.setDate(10, new Date(processTaskRelationLog.getCreateTime().getTime()));
+ insert.setDate(11, new Date(processTaskRelationLog.getUpdateTime().getTime()));
insert.addBatch();
insertLog.setLong(1, processTaskRelationLog.getProjectCode());
@@ -135,9 +135,9 @@ public class JsonSplitDao {
insertLog.setInt(8, processTaskRelationLog.getConditionType().getCode());
insertLog.setString(9, processTaskRelationLog.getConditionParams());
insertLog.setInt(10, processTaskRelationLog.getOperator());
- insertLog.setDate(11, (Date) processTaskRelationLog.getOperateTime());
- insertLog.setDate(12, (Date) processTaskRelationLog.getCreateTime());
- insertLog.setDate(13, (Date) processTaskRelationLog.getUpdateTime());
+ insertLog.setDate(11, new Date(processTaskRelationLog.getOperateTime().getTime()));
+ insertLog.setDate(12, new Date(processTaskRelationLog.getCreateTime().getTime()));
+ insertLog.setDate(13, new Date(processTaskRelationLog.getUpdateTime().getTime()));
insertLog.addBatch();
i++;
@@ -169,10 +169,10 @@ public class JsonSplitDao {
public void executeJsonSplitTaskDefinition(Connection conn, List<TaskDefinitionLog> taskDefinitionLogs) {
String insertSql = "insert into t_ds_task_definition (code,name,version,description,project_code,user_id,task_type,task_params,flag,task_priority,"
+ "worker_group,environment_code,fail_retry_times,fail_retry_interval,timeout_flag,timeout_notify_strategy,timeout,delay_time,resource_ids,"
- + "create_time,update_time) values values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
+ + "create_time,update_time) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
String insertLogSql = "insert into t_ds_task_definition_log (code,name,version,description,project_code,user_id,task_type,task_params,flag,task_priority,"
+ "worker_group,environment_code,fail_retry_times,fail_retry_interval,timeout_flag,timeout_notify_strategy,timeout,delay_time,resource_ids,operator,"
- + "operate_time,create_time,update_time) values values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
+ + "operate_time,create_time,update_time) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
try {
PreparedStatement insert = conn.prepareStatement(insertSql);
PreparedStatement insertLog = conn.prepareStatement(insertLogSql);
@@ -193,12 +193,12 @@ public class JsonSplitDao {
insert.setInt(13, taskDefinitionLog.getFailRetryTimes());
insert.setInt(14, taskDefinitionLog.getFailRetryInterval());
insert.setInt(15, taskDefinitionLog.getTimeoutFlag().getCode());
- insert.setInt(16, taskDefinitionLog.getTimeoutNotifyStrategy().getCode());
+ insert.setInt(16, taskDefinitionLog.getTimeoutNotifyStrategy() == null ? 0 : taskDefinitionLog.getTimeoutNotifyStrategy().getCode());
insert.setInt(17, taskDefinitionLog.getTimeout());
insert.setInt(18, taskDefinitionLog.getDelayTime());
insert.setString(19, taskDefinitionLog.getResourceIds());
- insert.setDate(20, (Date) taskDefinitionLog.getCreateTime());
- insert.setDate(21, (Date) taskDefinitionLog.getUpdateTime());
+ insert.setDate(20, new Date(taskDefinitionLog.getCreateTime().getTime()));
+ insert.setDate(21, new Date(taskDefinitionLog.getUpdateTime().getTime()));
insert.addBatch();
insertLog.setLong(1, taskDefinitionLog.getCode());
@@ -216,14 +216,14 @@ public class JsonSplitDao {
insertLog.setInt(13, taskDefinitionLog.getFailRetryTimes());
insertLog.setInt(14, taskDefinitionLog.getFailRetryInterval());
insertLog.setInt(15, taskDefinitionLog.getTimeoutFlag().getCode());
- insertLog.setInt(16, taskDefinitionLog.getTimeoutNotifyStrategy().getCode());
+ insertLog.setInt(16, taskDefinitionLog.getTimeoutNotifyStrategy() == null ? 0 : taskDefinitionLog.getTimeoutNotifyStrategy().getCode());
insertLog.setInt(17, taskDefinitionLog.getTimeout());
insertLog.setInt(18, taskDefinitionLog.getDelayTime());
insertLog.setString(19, taskDefinitionLog.getResourceIds());
insertLog.setInt(20, taskDefinitionLog.getOperator());
- insertLog.setDate(21, (Date) taskDefinitionLog.getOperateTime());
- insertLog.setDate(22, (Date) taskDefinitionLog.getCreateTime());
- insertLog.setDate(23, (Date) taskDefinitionLog.getUpdateTime());
+ insertLog.setDate(21, new Date(taskDefinitionLog.getOperateTime().getTime()));
+ insertLog.setDate(22, new Date(taskDefinitionLog.getCreateTime().getTime()));
+ insertLog.setDate(23, new Date(taskDefinitionLog.getUpdateTime().getTime()));
insertLog.addBatch();
i++;
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java
index 9cb03b8..c41359a 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java
@@ -148,7 +148,7 @@ public class ProcessDefinitionDao {
pstmt.setLong(1, processDefinition.getCode());
long projectCode = processDefinition.getProjectCode();
if (String.valueOf(projectCode).length() <= 10) {
- Integer projectId = Integer.getInteger(String.valueOf(projectCode));
+ Integer projectId = Integer.parseInt(String.valueOf(projectCode));
if (projectIdCodeMap.containsKey(projectId)) {
projectCode = projectIdCodeMap.get(projectId);
processDefinition.setProjectCode(projectCode);
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ScheduleDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ScheduleDao.java
index 0970200..80a49fd 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ScheduleDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ScheduleDao.java
@@ -77,7 +77,7 @@ public class ScheduleDao {
try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
long projectDefinitionCode = entry.getValue();
if (String.valueOf(projectDefinitionCode).length() <= 10) {
- Integer projectDefinitionId = Integer.getInteger(String.valueOf(projectDefinitionCode));
+ Integer projectDefinitionId = Integer.parseInt(String.valueOf(projectDefinitionCode));
if (processIdCodeMap.containsKey(projectDefinitionId)) {
projectDefinitionCode = processIdCodeMap.get(projectDefinitionId);
}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java
index 235f223..9109be5 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java
@@ -63,10 +63,10 @@ import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
public abstract class UpgradeDao extends AbstractBaseDao {
@@ -649,14 +649,17 @@ public abstract class UpgradeDao extends AbstractBaseDao {
ObjectNode param = (ObjectNode) task.get("params");
TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog();
if (param != null) {
- List<ResourceInfo> resourceList = JSONUtils.toList(param.get("resourceList").toString(), ResourceInfo.class);
- if (!resourceList.isEmpty()) {
+ JsonNode resourceJsonNode = param.get("resourceList");
+ if (resourceJsonNode != null && !resourceJsonNode.isEmpty()) {
+ List<ResourceInfo> resourceList = JSONUtils.toList(param.get("resourceList").toString(), ResourceInfo.class);
List<Integer> resourceIds = resourceList.stream().map(ResourceInfo::getId).collect(Collectors.toList());
- taskDefinitionLog.setResourceIds(StringUtils.join(resourceIds, ","));
+ taskDefinitionLog.setResourceIds(StringUtils.join(resourceIds, Constants.COMMA));
+ } else {
+ taskDefinitionLog.setResourceIds(StringUtils.EMPTY);
}
param.put("conditionResult", task.get("conditionResult"));
param.put("dependence", task.get("dependence"));
- taskDefinitionLog.setTaskParams(param.asText());
+ taskDefinitionLog.setTaskParams(JSONUtils.toJsonString(param));
}
TaskTimeoutParameter timeout = JSONUtils.parseObject(JSONUtils.toJsonString(task.get("timeout")), TaskTimeoutParameter.class);
if (timeout != null) {
@@ -674,6 +677,7 @@ public abstract class UpgradeDao extends AbstractBaseDao {
taskDefinitionLog.setName(name);
taskDefinitionLog.setWorkerGroup(task.get("workerGroup").asText());
long taskCode = SnowFlakeUtils.getInstance().nextId();
+ // System.out.println(taskCode);
taskDefinitionLog.setCode(taskCode);
taskDefinitionLog.setVersion(Constants.VERSION_FIRST);
taskDefinitionLog.setProjectCode(processDefinition.getProjectCode());
@@ -686,7 +690,7 @@ public abstract class UpgradeDao extends AbstractBaseDao {
taskDefinitionLog.setUpdateTime(now);
taskDefinitionLogList.add(taskDefinitionLog);
taskIdCodeMap.put(task.get("id").asText(), taskCode);
- List<String> preTasks = JSONUtils.toList(task.get("preTasks").asText(), String.class);
+ List<String> preTasks = JSONUtils.toList(task.get("preTasks").toString(), String.class);
taskNamePreMap.put(name, preTasks);
taskNameCodeMap.put(name, taskCode);
}
@@ -745,13 +749,16 @@ public abstract class UpgradeDao extends AbstractBaseDao {
if (StringUtils.isBlank(locations)) {
return locations;
}
- Map<String, String> locationsMap = JSONUtils.toMap(locations);
- JsonNodeFactory factory = new JsonNodeFactory(false);
- ArrayNode jsonNodes = factory.arrayNode();
- for (Map.Entry<String, String> entry : locationsMap.entrySet()) {
- ObjectNode nodes = factory.objectNode();
+ Map<String, ObjectNode> locationsMap = JSONUtils.parseObject(locations, new TypeReference<Map<String, ObjectNode>>() {
+ });
+ if (locationsMap == null) {
+ return locations;
+ }
+ ArrayNode jsonNodes = JSONUtils.createArrayNode();
+ for (Map.Entry<String, ObjectNode> entry : locationsMap.entrySet()) {
+ ObjectNode nodes = JSONUtils.createObjectNode();
nodes.put("taskCode", taskIdCodeMap.get(entry.getKey()));
- ObjectNode oldNodes = JSONUtils.parseObject(entry.getValue());
+ ObjectNode oldNodes = entry.getValue();
nodes.put("x", oldNodes.get("x").asInt());
nodes.put("y", oldNodes.get("y").asInt());
jsonNodes.add(nodes);
diff --git a/sql/upgrade/2.0.0_schema/mysql/dolphinscheduler_ddl_post.sql b/sql/upgrade/2.0.0_schema/mysql/dolphinscheduler_ddl_post.sql
index 5f3f65f..dfde962 100644
--- a/sql/upgrade/2.0.0_schema/mysql/dolphinscheduler_ddl_post.sql
+++ b/sql/upgrade/2.0.0_schema/mysql/dolphinscheduler_ddl_post.sql
@@ -15,8 +15,7 @@
* limitations under the License.
*/
-alter table t_ds_process_definition drop primary key;
-ALTER TABLE t_ds_process_definition ADD PRIMARY KEY (`id`,`code`);
+alter table t_ds_process_definition drop primary key, ADD PRIMARY KEY (`id`,`code`);
ALTER TABLE t_ds_process_definition drop KEY `process_definition_unique`;
ALTER TABLE t_ds_process_definition drop KEY `process_definition_index`;
alter table t_ds_process_definition drop process_definition_json;