You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by lg...@apache.org on 2020/05/22 10:46:13 UTC

[incubator-dolphinscheduler] branch dev-1.3.0 updated: Fix [Bug] process definition json worker group convert #2794 (#2795)

This is an automated email from the ASF dual-hosted git repository.

lgcareer pushed a commit to branch dev-1.3.0
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/dev-1.3.0 by this push:
     new 0130da2  Fix [Bug] process definition json worker group convert #2794 (#2795)
0130da2 is described below

commit 0130da2bdf07880ba25765cb1b7803f0a598e3b3
Author: qiaozhanwei <qi...@outlook.com>
AuthorDate: Fri May 22 18:46:05 2020 +0800

    Fix [Bug] process definition json worker group convert #2794 (#2795)
    
    * add LoggerServerTest UT
    
    * add LoggerServerTest UT
    
    * add LoggerServerTest UT
    add RemoveTaskLogRequestCommandTest UT
    add RemoveTaskLogResponseCommandTest
    
    * master select worker filter high load worker #2704
    
    * master select worker filter high load worker #2704
    
    * master select worker filter high load worker #2704
    
    * master select worker filter high load worker #2704
    
    * master select worker filter high load worker #2704
    
    * master select worker filter high load worker #2704
    
    * add not worker log and remove worker invalid property
    
    * process definition json worker group convert #2794
    
    * process definition json worker group convert #2794
    
    * process definition json worker group convert #2794
    
    * process definition json worker group convert #2794
    
    * process definition json worker group convert #2794
    
    * process definition json worker group convert #2794
    
    Co-authored-by: qiaozhanwei <qi...@analysys.com.cn>
---
 .../dolphinscheduler/common/model/TaskNode.java    | 13 +++
 .../common/shell/ShellExecutorTest.java            | 49 ++----------
 .../dolphinscheduler/dao/entity/ProcessData.java   | 10 +++
 .../dolphinscheduler/dao/entity/WorkerGroup.java   |  3 -
 .../dao/upgrade/ProcessDefinitionDao.java          | 92 ++++++++++++++++++++++
 .../dolphinscheduler/dao/upgrade/UpgradeDao.java   | 48 +++++++++--
 .../dao/upgrade/WorkerGroupDao.java                | 65 +++++++++++++++
 .../dao/mapper/WorkerGroupMapper.xml               | 40 ----------
 .../dao/upgrade/ProcessDefinitionDaoTest.java      | 65 +++++++++++++++
 .../dao/upgrade/UpgradeDaoTest.java                | 36 +++++++++
 .../dao/upgrade/WokrerGrouopDaoTest.java           | 51 ++++++++++++
 pom.xml                                            |  4 +-
 12 files changed, 385 insertions(+), 91 deletions(-)

diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java
index 35767a0..5881b02 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java
@@ -124,6 +124,11 @@ public class TaskNode {
    */
   private String workerGroup;
 
+  /**
+   * worker group id
+   */
+  private Integer workerGroupId;
+
 
   /**
    * task time out
@@ -341,4 +346,12 @@ public class TaskNode {
   public void setConditionResult(String conditionResult) {
     this.conditionResult = conditionResult;
   }
+
+  public Integer getWorkerGroupId() {
+    return workerGroupId;
+  }
+
+  public void setWorkerGroupId(Integer workerGroupId) {
+    this.workerGroupId = workerGroupId;
+  }
 }
diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/shell/ShellExecutorTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/shell/ShellExecutorTest.java
index 70ca5e2..e21bc77 100644
--- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/shell/ShellExecutorTest.java
+++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/shell/ShellExecutorTest.java
@@ -33,47 +33,12 @@ public class ShellExecutorTest {
     @Test
     public void execCommand() throws InterruptedException {
 
-        ThreadPoolExecutors executors = ThreadPoolExecutors.getInstance();
-        CountDownLatch latch = new CountDownLatch(200);
-
-        executors.execute(new Runnable() {
-            @Override
-            public void run() {
-
-                try {
-                    int i =0;
-                    while(i++ <= 100){
-                        String res = ShellExecutor.execCommand("groups");
-                        logger.info("time:" + i + ",thread id:" + Thread.currentThread().getId() + ", result:" + res.substring(0,5));
-                        Thread.sleep(100l);
-                        latch.countDown();
-                    }
-
-                } catch (IOException | InterruptedException e) {
-                    e.printStackTrace();
-                }
-            }
-        });
-
-        executors.execute(new Runnable() {
-            @Override
-            public void run() {
-
-                try {
-                    int i =0;
-                    while(i++ <= 100){
-                        String res = ShellExecutor.execCommand("whoami");
-                        logger.info("time:" + i + ",thread id:" + Thread.currentThread().getId() + ", result2:" + res);
-                        Thread.sleep(100l);
-                        latch.countDown();
-                    }
-
-                } catch (IOException | InterruptedException e) {
-                    e.printStackTrace();
-                }
-            }
-        });
-
-        latch.await();
+        try {
+            String res = ShellExecutor.execCommand("groups");
+            logger.info("thread id:" + Thread.currentThread().getId() + ", result:" + res.substring(0, 5));
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
     }
+
 }
\ No newline at end of file
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessData.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessData.java
index b563487..e9a6d99 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessData.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessData.java
@@ -102,4 +102,14 @@ public class ProcessData {
   public void setTenantId(int tenantId) {
     this.tenantId = tenantId;
   }
+
+  @Override
+  public String toString() {
+    return "ProcessData{" +
+            "tasks=" + tasks +
+            ", globalParams=" + globalParams +
+            ", timeout=" + timeout +
+            ", tenantId=" + tenantId +
+            '}';
+  }
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkerGroup.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkerGroup.java
index bce9636..a2cc3fd 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkerGroup.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkerGroup.java
@@ -16,9 +16,6 @@
  */
 package org.apache.dolphinscheduler.dao.entity;
 
-import com.baomidou.mybatisplus.annotation.IdType;
-import com.baomidou.mybatisplus.annotation.TableId;
-import com.baomidou.mybatisplus.annotation.TableName;
 
 import java.util.Date;
 import java.util.List;
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java
new file mode 100644
index 0000000..768f75c
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.dao.upgrade;
+
+import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.*;
+
+public class ProcessDefinitionDao {
+
+
+    public static final Logger logger = LoggerFactory.getLogger(ProcessDefinitionDao.class);
+
+    /**
+     * queryAllProcessDefinition
+     * @param conn jdbc connection
+     * @return ProcessDefinition Json List
+     */
+    public Map<Integer,String> queryAllProcessDefinition(Connection conn){
+
+        Map<Integer,String> processDefinitionJsonMap = new HashMap<>();
+
+        String sql = String.format("SELECT id,process_definition_json FROM t_ds_process_definition");
+        ResultSet rs = null;
+        PreparedStatement pstmt = null;
+        try {
+            pstmt = conn.prepareStatement(sql);
+            rs = pstmt.executeQuery();
+
+            while (rs.next()){
+                Integer id = rs.getInt(1);
+                String processDefinitionJson = rs.getString(2);
+                processDefinitionJsonMap.put(id,processDefinitionJson);
+            }
+
+        } catch (Exception e) {
+            logger.error(e.getMessage(),e);
+            throw new RuntimeException("sql: " + sql, e);
+        } finally {
+            ConnectionUtils.releaseResource(rs, pstmt, conn);
+        }
+
+        return processDefinitionJsonMap;
+    }
+
+
+    /**
+     * updateProcessDefinitionJson
+     * @param conn jdbc connection
+     * @param processDefinitionJsonMap processDefinitionJsonMap
+     */
+    public void updateProcessDefinitionJson(Connection conn,Map<Integer,String> processDefinitionJsonMap){
+        String sql = "UPDATE t_ds_process_definition SET process_definition_json=? where id=?";
+        try {
+            for (Map.Entry<Integer, String> entry : processDefinitionJsonMap.entrySet()){
+                try(PreparedStatement pstmt= conn.prepareStatement(sql)) {
+                    pstmt.setString(1,entry.getValue());
+                    pstmt.setInt(2,entry.getKey());
+                    pstmt.executeUpdate();
+                }
+
+            }
+
+        } catch (Exception e) {
+            logger.error(e.getMessage(),e);
+            throw new RuntimeException("sql: " + sql, e);
+        } finally {
+            ConnectionUtils.releaseResource(conn);
+        }
+    }
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java
index e708620..692351b 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java
@@ -16,14 +16,12 @@
  */
 package org.apache.dolphinscheduler.dao.upgrade;
 
-import com.alibaba.druid.pool.DruidDataSource;
 import org.apache.dolphinscheduler.common.enums.DbType;
-import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
-import org.apache.dolphinscheduler.common.utils.SchemaUtils;
-import org.apache.dolphinscheduler.common.utils.ScriptRunner;
-import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.common.model.TaskNode;
+import org.apache.dolphinscheduler.common.utils.*;
 import org.apache.dolphinscheduler.dao.AbstractBaseDao;
 import org.apache.dolphinscheduler.dao.datasource.ConnectionFactory;
+import org.apache.dolphinscheduler.dao.entity.ProcessData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,6 +32,7 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.text.MessageFormat;
+import java.util.*;
 
 public abstract class UpgradeDao extends AbstractBaseDao {
 
@@ -44,6 +43,7 @@ public abstract class UpgradeDao extends AbstractBaseDao {
     protected static final DataSource dataSource = getDataSource();
     private static final DbType dbType = getCurrentDbType();
 
+
     @Override
     protected void init() {
 
@@ -119,6 +119,7 @@ public abstract class UpgradeDao extends AbstractBaseDao {
         // Execute the dolphinscheduler DML, it can be rolled back
         runInitDML(initSqlPath);
 
+
     }
 
     /**
@@ -256,6 +257,43 @@ public abstract class UpgradeDao extends AbstractBaseDao {
 
         upgradeDolphinSchedulerDML(schemaDir);
 
+        updateProcessDefinitionJsonWorkerGroup();
+
+
+    }
+
+    /**
+     * updateProcessDefinitionJsonWorkerGroup
+     */
+    protected void updateProcessDefinitionJsonWorkerGroup(){
+        WorkerGroupDao workerGroupDao = new WorkerGroupDao();
+        ProcessDefinitionDao processDefinitionDao = new ProcessDefinitionDao();
+        Map<Integer,String> replaceProcessDefinitionMap = new HashMap<>();
+        try {
+            Map<Integer, String> oldWorkerGroupMap = workerGroupDao.queryAllOldWorkerGroup(dataSource.getConnection());
+            Map<Integer,String> processDefinitionJsonMap = processDefinitionDao.queryAllProcessDefinition(dataSource.getConnection());
+
+            for (Map.Entry<Integer,String> entry : processDefinitionJsonMap.entrySet()){
+                ProcessData processData = JSONUtils.parseObject(entry.getValue(), ProcessData.class);
+
+                List<TaskNode> tasks = processData.getTasks();
+                for (TaskNode taskNode : tasks){
+                    Integer workerGroupId = taskNode.getWorkerGroupId();
+                    if (workerGroupId == -1){
+                        taskNode.setWorkerGroup("default");
+                    }else {
+                        taskNode.setWorkerGroup(oldWorkerGroupMap.get(workerGroupId));
+                    }
+                }
+                replaceProcessDefinitionMap.put(entry.getKey(),JSONUtils.toJson(processData));
+            }
+            if (replaceProcessDefinitionMap.size() > 0){
+                processDefinitionDao.updateProcessDefinitionJson(dataSource.getConnection(),replaceProcessDefinitionMap);
+            }
+        }catch (Exception e){
+            logger.error("update process definition json workergroup error",e);
+        }
+
     }
 
     /**
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/WorkerGroupDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/WorkerGroupDao.java
new file mode 100644
index 0000000..936b1d4
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/WorkerGroupDao.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.dao.upgrade;
+
+import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class WorkerGroupDao {
+
+    public static final Logger logger = LoggerFactory.getLogger(WorkerGroupDao.class);
+
+    /**
+     * query all old worker group
+     * @param conn jdbc connection
+     * @return old worker group Map
+     */
+    public Map<Integer,String> queryAllOldWorkerGroup(Connection conn){
+        Map<Integer,String> workerGroupMap = new HashMap<>();
+
+        String sql = String.format("select id,name from t_ds_worker_group");
+        ResultSet rs = null;
+        PreparedStatement pstmt = null;
+        try {
+            pstmt = conn.prepareStatement(sql);
+            rs = pstmt.executeQuery();
+
+            while (rs.next()){
+                int id = rs.getInt(1);
+                String name = rs.getString(2);
+                workerGroupMap.put(id,name);
+            }
+
+        } catch (Exception e) {
+            logger.error(e.getMessage(),e);
+            throw new RuntimeException("sql: " + sql, e);
+        } finally {
+            ConnectionUtils.releaseResource(rs, pstmt, conn);
+        }
+
+        return workerGroupMap;
+    }
+}
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.xml
deleted file mode 100644
index 84dd4db..0000000
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.xml
+++ /dev/null
@@ -1,40 +0,0 @@
-<?xml version="1.0" encoding="UTF-8" ?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~     http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
-<mapper namespace="org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper">
-    <select id="queryAllWorkerGroup" resultType="org.apache.dolphinscheduler.dao.entity.WorkerGroup">
-        select *
-        from t_ds_worker_group
-        order by update_time desc
-    </select>
-    <select id="queryWorkerGroupByName" resultType="org.apache.dolphinscheduler.dao.entity.WorkerGroup">
-        select *
-        from t_ds_worker_group
-        where name = #{name}
-    </select>
-    <select id="queryListPaging" resultType="org.apache.dolphinscheduler.dao.entity.WorkerGroup">
-        select *
-        from t_ds_worker_group
-        where 1 = 1
-        <if test="searchVal != null and searchVal != ''">
-            and  name like concat('%', #{searchVal}, '%')
-        </if>
-        order by  update_time desc
-    </select>
-</mapper>
\ No newline at end of file
diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDaoTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDaoTest.java
new file mode 100644
index 0000000..a7bbd53
--- /dev/null
+++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDaoTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.dao.upgrade;
+
+
+import org.junit.Test;
+
+import javax.sql.DataSource;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.dolphinscheduler.dao.upgrade.UpgradeDao.getDataSource;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.junit.Assert.assertThat;
+
+public class ProcessDefinitionDaoTest {
+    final DataSource dataSource = getDataSource();
+    final ProcessDefinitionDao processDefinitionDao = new ProcessDefinitionDao();
+
+    @Test
+    public void testQueryAllProcessDefinition() throws Exception{
+
+        Map<Integer, String> processDefinitionJsonMap = processDefinitionDao.queryAllProcessDefinition(dataSource.getConnection());
+
+        assertThat(processDefinitionJsonMap.size(),greaterThanOrEqualTo(0));
+    }
+
+    @Test
+    public void testUpdateProcessDefinitionJson() throws Exception{
+
+        Map<Integer,String> processDefinitionJsonMap = new HashMap<>();
+        processDefinitionJsonMap.put(1,"test");
+
+        processDefinitionDao.updateProcessDefinitionJson(dataSource.getConnection(),processDefinitionJsonMap);
+
+    }
+
+    @Test(expected = Exception.class)
+    public void testQueryAllProcessDefinitionException() throws Exception{
+        processDefinitionDao.queryAllProcessDefinition(null);
+
+    }
+
+    @Test(expected = Exception.class)
+    public void testUpdateProcessDefinitionJsonException() throws Exception{
+        processDefinitionDao.updateProcessDefinitionJson(null,null);
+
+    }
+
+
+}
diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDaoTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDaoTest.java
new file mode 100644
index 0000000..ed96e92
--- /dev/null
+++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDaoTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.dao.upgrade;
+
+import org.junit.Test;
+
+import javax.sql.DataSource;
+import java.util.Map;
+
+import static org.apache.dolphinscheduler.dao.upgrade.UpgradeDao.getDataSource;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.junit.Assert.assertThat;
+
+public class UpgradeDaoTest {
+    PostgresqlUpgradeDao postgresqlUpgradeDao = PostgresqlUpgradeDao.getInstance();
+
+    @Test
+    public void testQueryQueryAllOldWorkerGroup() throws Exception{
+        postgresqlUpgradeDao.updateProcessDefinitionJsonWorkerGroup();
+    }
+
+}
diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/upgrade/WokrerGrouopDaoTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/upgrade/WokrerGrouopDaoTest.java
new file mode 100644
index 0000000..2c9b80a
--- /dev/null
+++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/upgrade/WokrerGrouopDaoTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.dao.upgrade;
+
+import org.junit.Test;
+
+import javax.sql.DataSource;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.dolphinscheduler.dao.upgrade.UpgradeDao.getDataSource;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.junit.Assert.assertThat;
+
+public class WokrerGrouopDaoTest {
+    protected  final DataSource dataSource = getDataSource();
+
+    @Test
+    public void testQueryQueryAllOldWorkerGroup() throws Exception{
+        WorkerGroupDao workerGroupDao = new WorkerGroupDao();
+
+        Map<Integer, String> workerGroupMap = workerGroupDao.queryAllOldWorkerGroup(dataSource.getConnection());
+
+        assertThat(workerGroupMap.size(),greaterThanOrEqualTo(0));
+    }
+
+    @Test(expected = Exception.class)
+    public void testQueryQueryAllOldWorkerGroupException() throws Exception{
+        WorkerGroupDao workerGroupDao = new WorkerGroupDao();
+
+        workerGroupDao.queryAllOldWorkerGroup(null);
+
+    }
+
+}
diff --git a/pom.xml b/pom.xml
index 13a4c33..d4b7a20 100644
--- a/pom.xml
+++ b/pom.xml
@@ -829,7 +829,6 @@
                         <include>**/dao/mapper/ProjectMapperTest.java</include>
                         <include>**/dao/mapper/ProjectUserMapperTest.java</include>
                         <include>**/dao/mapper/QueueMapperTest.java</include>
-                        <!--<include>**/dao/mapper/ResourceMapperTest.java</include>-->
                         <include>**/dao/mapper/ResourceUserMapperTest.java</include>
                         <include>**/dao/mapper/ScheduleMapperTest.java</include>
                         <include>**/dao/mapper/SessionMapperTest.java</include>
@@ -841,6 +840,9 @@
                         <include>**/dao/mapper/UserMapperTest.java</include>
                         <include>**/dao/utils/DagHelperTest.java</include>
                         <include>**/dao/AlertDaoTest.java</include>
+                        <include>**/dao/upgrade/ProcessDefinitionDaoTest.java</include>
+                        <include>**/dao/upgrade/WokrerGrouopDaoTest.java</include>
+                        <include>**/dao/upgrade/UpgradeDaoTest.java</include>
                         <include>**/plugin/model/AlertDataTest.java</include>
                         <include>**/plugin/model/AlertInfoTest.java</include>
                         <include>**/plugin/utils/PropertyUtilsTest.java</include>