You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by le...@apache.org on 2021/11/08 03:57:44 UTC

[dolphinscheduler] branch dev updated: [Upgrade][Install] fix upgrade 2.0 bug (#6734)

This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new bdea8d6  [Upgrade][Install] fix upgrade 2.0 bug (#6734)
bdea8d6 is described below

commit bdea8d6ae4e35d64686d97f606b1e235f9664dbe
Author: JinYong Li <42...@users.noreply.github.com>
AuthorDate: Mon Nov 8 11:57:38 2021 +0800

    [Upgrade][Install] fix upgrade 2.0 bug (#6734)
    
    * add convert dependent/conditions
    
    * fix upgrade 2.0 bug
    
    * fix upgrade 2.0 bug
---
 .../dolphinscheduler/dao/upgrade/JsonSplitDao.java | 36 +++++++++++-----------
 .../dao/upgrade/ProcessDefinitionDao.java          |  2 +-
 .../dolphinscheduler/dao/upgrade/ScheduleDao.java  |  2 +-
 .../dolphinscheduler/dao/upgrade/UpgradeDao.java   | 31 +++++++++++--------
 .../mysql/dolphinscheduler_ddl_post.sql            |  3 +-
 5 files changed, 40 insertions(+), 34 deletions(-)

diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/JsonSplitDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/JsonSplitDao.java
index 790d33a..46c7610 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/JsonSplitDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/JsonSplitDao.java
@@ -53,7 +53,7 @@ public class JsonSplitDao {
                 processUpdate.setInt(2, processDefinitionLog.getTimeout());
                 processUpdate.setInt(3, processDefinitionLog.getTenantId());
                 processUpdate.setString(4, processDefinitionLog.getLocations());
-                processUpdate.setDate(5, (Date) processDefinitionLog.getUpdateTime());
+                processUpdate.setDate(5, new Date(processDefinitionLog.getUpdateTime().getTime()));
                 processUpdate.setInt(6, processDefinitionLog.getId());
                 processUpdate.addBatch();
 
@@ -70,9 +70,9 @@ public class JsonSplitDao {
                 insertLog.setInt(11, processDefinitionLog.getTimeout());
                 insertLog.setInt(12, processDefinitionLog.getTenantId());
                 insertLog.setInt(13, processDefinitionLog.getOperator());
-                insertLog.setDate(14, (Date) processDefinitionLog.getOperateTime());
-                insertLog.setDate(15, (Date) processDefinitionLog.getCreateTime());
-                insertLog.setDate(16, (Date) processDefinitionLog.getUpdateTime());
+                insertLog.setDate(14, new Date(processDefinitionLog.getOperateTime().getTime()));
+                insertLog.setDate(15, new Date(processDefinitionLog.getCreateTime().getTime()));
+                insertLog.setDate(16, new Date(processDefinitionLog.getUpdateTime().getTime()));
                 insertLog.addBatch();
 
                 i++;
@@ -121,8 +121,8 @@ public class JsonSplitDao {
                 insert.setInt(7, processTaskRelationLog.getPostTaskVersion());
                 insert.setInt(8, processTaskRelationLog.getConditionType().getCode());
                 insert.setString(9, processTaskRelationLog.getConditionParams());
-                insert.setDate(10, (Date) processTaskRelationLog.getCreateTime());
-                insert.setDate(11, (Date) processTaskRelationLog.getUpdateTime());
+                insert.setDate(10, new Date(processTaskRelationLog.getCreateTime().getTime()));
+                insert.setDate(11, new Date(processTaskRelationLog.getUpdateTime().getTime()));
                 insert.addBatch();
 
                 insertLog.setLong(1, processTaskRelationLog.getProjectCode());
@@ -135,9 +135,9 @@ public class JsonSplitDao {
                 insertLog.setInt(8, processTaskRelationLog.getConditionType().getCode());
                 insertLog.setString(9, processTaskRelationLog.getConditionParams());
                 insertLog.setInt(10, processTaskRelationLog.getOperator());
-                insertLog.setDate(11, (Date) processTaskRelationLog.getOperateTime());
-                insertLog.setDate(12, (Date) processTaskRelationLog.getCreateTime());
-                insertLog.setDate(13, (Date) processTaskRelationLog.getUpdateTime());
+                insertLog.setDate(11, new Date(processTaskRelationLog.getOperateTime().getTime()));
+                insertLog.setDate(12, new Date(processTaskRelationLog.getCreateTime().getTime()));
+                insertLog.setDate(13, new Date(processTaskRelationLog.getUpdateTime().getTime()));
                 insertLog.addBatch();
 
                 i++;
@@ -169,10 +169,10 @@ public class JsonSplitDao {
     public void executeJsonSplitTaskDefinition(Connection conn, List<TaskDefinitionLog> taskDefinitionLogs) {
         String insertSql = "insert into t_ds_task_definition (code,name,version,description,project_code,user_id,task_type,task_params,flag,task_priority,"
             + "worker_group,environment_code,fail_retry_times,fail_retry_interval,timeout_flag,timeout_notify_strategy,timeout,delay_time,resource_ids,"
-            + "create_time,update_time) values values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
+            + "create_time,update_time) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
         String insertLogSql = "insert into t_ds_task_definition_log (code,name,version,description,project_code,user_id,task_type,task_params,flag,task_priority,"
             + "worker_group,environment_code,fail_retry_times,fail_retry_interval,timeout_flag,timeout_notify_strategy,timeout,delay_time,resource_ids,operator,"
-            + "operate_time,create_time,update_time) values values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
+            + "operate_time,create_time,update_time) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
         try {
             PreparedStatement insert = conn.prepareStatement(insertSql);
             PreparedStatement insertLog = conn.prepareStatement(insertLogSql);
@@ -193,12 +193,12 @@ public class JsonSplitDao {
                 insert.setInt(13, taskDefinitionLog.getFailRetryTimes());
                 insert.setInt(14, taskDefinitionLog.getFailRetryInterval());
                 insert.setInt(15, taskDefinitionLog.getTimeoutFlag().getCode());
-                insert.setInt(16, taskDefinitionLog.getTimeoutNotifyStrategy().getCode());
+                insert.setInt(16, taskDefinitionLog.getTimeoutNotifyStrategy() == null ? 0 : taskDefinitionLog.getTimeoutNotifyStrategy().getCode());
                 insert.setInt(17, taskDefinitionLog.getTimeout());
                 insert.setInt(18, taskDefinitionLog.getDelayTime());
                 insert.setString(19, taskDefinitionLog.getResourceIds());
-                insert.setDate(20, (Date) taskDefinitionLog.getCreateTime());
-                insert.setDate(21, (Date) taskDefinitionLog.getUpdateTime());
+                insert.setDate(20, new Date(taskDefinitionLog.getCreateTime().getTime()));
+                insert.setDate(21, new Date(taskDefinitionLog.getUpdateTime().getTime()));
                 insert.addBatch();
 
                 insertLog.setLong(1, taskDefinitionLog.getCode());
@@ -216,14 +216,14 @@ public class JsonSplitDao {
                 insertLog.setInt(13, taskDefinitionLog.getFailRetryTimes());
                 insertLog.setInt(14, taskDefinitionLog.getFailRetryInterval());
                 insertLog.setInt(15, taskDefinitionLog.getTimeoutFlag().getCode());
-                insertLog.setInt(16, taskDefinitionLog.getTimeoutNotifyStrategy().getCode());
+                insertLog.setInt(16, taskDefinitionLog.getTimeoutNotifyStrategy() == null ? 0 : taskDefinitionLog.getTimeoutNotifyStrategy().getCode());
                 insertLog.setInt(17, taskDefinitionLog.getTimeout());
                 insertLog.setInt(18, taskDefinitionLog.getDelayTime());
                 insertLog.setString(19, taskDefinitionLog.getResourceIds());
                 insertLog.setInt(20, taskDefinitionLog.getOperator());
-                insertLog.setDate(21, (Date) taskDefinitionLog.getOperateTime());
-                insertLog.setDate(22, (Date) taskDefinitionLog.getCreateTime());
-                insertLog.setDate(23, (Date) taskDefinitionLog.getUpdateTime());
+                insertLog.setDate(21, new Date(taskDefinitionLog.getOperateTime().getTime()));
+                insertLog.setDate(22, new Date(taskDefinitionLog.getCreateTime().getTime()));
+                insertLog.setDate(23, new Date(taskDefinitionLog.getUpdateTime().getTime()));
                 insertLog.addBatch();
 
                 i++;
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java
index 9cb03b8..c41359a 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java
@@ -148,7 +148,7 @@ public class ProcessDefinitionDao {
                     pstmt.setLong(1, processDefinition.getCode());
                     long projectCode = processDefinition.getProjectCode();
                     if (String.valueOf(projectCode).length() <= 10) {
-                        Integer projectId = Integer.getInteger(String.valueOf(projectCode));
+                        Integer projectId = Integer.parseInt(String.valueOf(projectCode));
                         if (projectIdCodeMap.containsKey(projectId)) {
                             projectCode = projectIdCodeMap.get(projectId);
                             processDefinition.setProjectCode(projectCode);
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ScheduleDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ScheduleDao.java
index 0970200..80a49fd 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ScheduleDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ScheduleDao.java
@@ -77,7 +77,7 @@ public class ScheduleDao {
                 try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
                     long projectDefinitionCode = entry.getValue();
                     if (String.valueOf(projectDefinitionCode).length() <= 10) {
-                        Integer projectDefinitionId = Integer.getInteger(String.valueOf(projectDefinitionCode));
+                        Integer projectDefinitionId = Integer.parseInt(String.valueOf(projectDefinitionCode));
                         if (processIdCodeMap.containsKey(projectDefinitionId)) {
                             projectDefinitionCode = processIdCodeMap.get(projectDefinitionId);
                         }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java
index 235f223..9109be5 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java
@@ -63,10 +63,10 @@ import javax.sql.DataSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 
 public abstract class UpgradeDao extends AbstractBaseDao {
@@ -649,14 +649,17 @@ public abstract class UpgradeDao extends AbstractBaseDao {
                 ObjectNode param = (ObjectNode) task.get("params");
                 TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog();
                 if (param != null) {
-                    List<ResourceInfo> resourceList = JSONUtils.toList(param.get("resourceList").toString(), ResourceInfo.class);
-                    if (!resourceList.isEmpty()) {
+                    JsonNode resourceJsonNode = param.get("resourceList");
+                    if (resourceJsonNode != null && !resourceJsonNode.isEmpty()) {
+                        List<ResourceInfo> resourceList = JSONUtils.toList(param.get("resourceList").toString(), ResourceInfo.class);
                         List<Integer> resourceIds = resourceList.stream().map(ResourceInfo::getId).collect(Collectors.toList());
-                        taskDefinitionLog.setResourceIds(StringUtils.join(resourceIds, ","));
+                        taskDefinitionLog.setResourceIds(StringUtils.join(resourceIds, Constants.COMMA));
+                    } else {
+                        taskDefinitionLog.setResourceIds(StringUtils.EMPTY);
                     }
                     param.put("conditionResult", task.get("conditionResult"));
                     param.put("dependence", task.get("dependence"));
-                    taskDefinitionLog.setTaskParams(param.asText());
+                    taskDefinitionLog.setTaskParams(JSONUtils.toJsonString(param));
                 }
                 TaskTimeoutParameter timeout = JSONUtils.parseObject(JSONUtils.toJsonString(task.get("timeout")), TaskTimeoutParameter.class);
                 if (timeout != null) {
@@ -674,6 +677,7 @@ public abstract class UpgradeDao extends AbstractBaseDao {
                 taskDefinitionLog.setName(name);
                 taskDefinitionLog.setWorkerGroup(task.get("workerGroup").asText());
                 long taskCode = SnowFlakeUtils.getInstance().nextId();
+                // System.out.println(taskCode);
                 taskDefinitionLog.setCode(taskCode);
                 taskDefinitionLog.setVersion(Constants.VERSION_FIRST);
                 taskDefinitionLog.setProjectCode(processDefinition.getProjectCode());
@@ -686,7 +690,7 @@ public abstract class UpgradeDao extends AbstractBaseDao {
                 taskDefinitionLog.setUpdateTime(now);
                 taskDefinitionLogList.add(taskDefinitionLog);
                 taskIdCodeMap.put(task.get("id").asText(), taskCode);
-                List<String> preTasks = JSONUtils.toList(task.get("preTasks").asText(), String.class);
+                List<String> preTasks = JSONUtils.toList(task.get("preTasks").toString(), String.class);
                 taskNamePreMap.put(name, preTasks);
                 taskNameCodeMap.put(name, taskCode);
             }
@@ -745,13 +749,16 @@ public abstract class UpgradeDao extends AbstractBaseDao {
         if (StringUtils.isBlank(locations)) {
             return locations;
         }
-        Map<String, String> locationsMap = JSONUtils.toMap(locations);
-        JsonNodeFactory factory = new JsonNodeFactory(false);
-        ArrayNode jsonNodes = factory.arrayNode();
-        for (Map.Entry<String, String> entry : locationsMap.entrySet()) {
-            ObjectNode nodes = factory.objectNode();
+        Map<String, ObjectNode> locationsMap = JSONUtils.parseObject(locations, new TypeReference<Map<String, ObjectNode>>() {
+        });
+        if (locationsMap == null) {
+            return locations;
+        }
+        ArrayNode jsonNodes = JSONUtils.createArrayNode();
+        for (Map.Entry<String, ObjectNode> entry : locationsMap.entrySet()) {
+            ObjectNode nodes = JSONUtils.createObjectNode();
             nodes.put("taskCode", taskIdCodeMap.get(entry.getKey()));
-            ObjectNode oldNodes = JSONUtils.parseObject(entry.getValue());
+            ObjectNode oldNodes = entry.getValue();
             nodes.put("x", oldNodes.get("x").asInt());
             nodes.put("y", oldNodes.get("y").asInt());
             jsonNodes.add(nodes);
diff --git a/sql/upgrade/2.0.0_schema/mysql/dolphinscheduler_ddl_post.sql b/sql/upgrade/2.0.0_schema/mysql/dolphinscheduler_ddl_post.sql
index 5f3f65f..dfde962 100644
--- a/sql/upgrade/2.0.0_schema/mysql/dolphinscheduler_ddl_post.sql
+++ b/sql/upgrade/2.0.0_schema/mysql/dolphinscheduler_ddl_post.sql
@@ -15,8 +15,7 @@
  * limitations under the License.
 */
 
-alter table t_ds_process_definition drop primary key;
-ALTER TABLE t_ds_process_definition ADD PRIMARY KEY (`id`,`code`);
+alter table t_ds_process_definition drop primary key, ADD PRIMARY KEY (`id`,`code`);
 ALTER TABLE t_ds_process_definition drop KEY `process_definition_unique`;
 ALTER TABLE t_ds_process_definition drop KEY `process_definition_index`;
 alter table t_ds_process_definition drop process_definition_json;