You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by le...@apache.org on 2022/08/23 02:54:29 UTC
[dolphinscheduler] branch dev updated: [improvement] Add two parameters in workergroup, and support the application of description display and other parameters (#11542)
This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new abfef1a929 [improvement] Add two parameters in workergroup, and support the application of description display and other parameters (#11542)
abfef1a929 is described below
commit abfef1a929b0b28f2e5bce4f52a571fe33a6fe86
Author: insist777 <84...@users.noreply.github.com>
AuthorDate: Tue Aug 23 10:54:17 2022 +0800
[improvement] Add two parameters in workergroup, and support the application of description display and other parameters (#11542)
* [Improvement] Improvement default worker group
* [Improvement] Improvement default worker group
* [Improvement] Improvement default worker group
* [Improvement] Improvement default worker group
* [Improvement] Improvement default worker group
* [improvement] The default worker grouping supports editing
* [improvement] The default worker grouping supports editing
* Update WorkerGroupServiceImpl.java
* Update WorkerGroupServiceImpl.java
* The default worker grouping supports editing
* update sql file
* update
* update
* update
* update
---
.../api/controller/WorkerGroupController.java | 15 +++--
.../api/service/WorkerGroupService.java | 4 +-
.../api/service/impl/WorkerGroupServiceImpl.java | 27 +++++++--
.../api/controller/WorkerGroupControllerTest.java | 2 +
.../dolphinscheduler/api/utils/ResultTest.java | 6 +-
.../dolphinscheduler/dao/entity/WorkerGroup.java | 64 ++--------------------
.../dao/mapper/WorkerGroupMapper.java | 1 +
.../dao/mapper/WorkerGroupMapper.xml | 1 +
.../src/main/resources/sql/dolphinscheduler_h2.sql | 2 +
.../main/resources/sql/dolphinscheduler_mysql.sql | 2 +
.../resources/sql/dolphinscheduler_postgresql.sql | 2 +
.../3.1.1_schema/mysql/dolphinscheduler_ddl.sql | 20 +++++++
.../3.1.1_schema/mysql/dolphinscheduler_dml.sql | 16 ++++++
.../postgresql/dolphinscheduler_ddl.sql | 54 ++++++++++++++++++
.../postgresql/dolphinscheduler_dml.sql | 30 ++++++++++
.../server/master/registry/ServerNodeManager.java | 31 ++++++-----
16 files changed, 193 insertions(+), 84 deletions(-)
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkerGroupController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkerGroupController.java
index ccb196eb6a..69e1a57d0f 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkerGroupController.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkerGroupController.java
@@ -73,9 +73,11 @@ public class WorkerGroupController extends BaseController {
*/
@ApiOperation(value = "saveWorkerGroup", notes = "CREATE_WORKER_GROUP_NOTES")
@ApiImplicitParams({
- @ApiImplicitParam(name = "id", value = "WORKER_GROUP_ID", dataTypeClass = int.class, example = "10", defaultValue = "0"),
- @ApiImplicitParam(name = "name", value = "WORKER_GROUP_NAME", required = true, dataTypeClass = String.class),
- @ApiImplicitParam(name = "addrList", value = "WORKER_ADDR_LIST", required = true, dataTypeClass = String.class)
+ @ApiImplicitParam(name = "id", value = "WORKER_GROUP_ID", dataType = "Int", example = "10", defaultValue = "0"),
+ @ApiImplicitParam(name = "name", value = "WORKER_GROUP_NAME", required = true, dataType = "String"),
+ @ApiImplicitParam(name = "addrList", value = "WORKER_ADDR_LIST", required = true, dataType = "String"),
+ @ApiImplicitParam(name = "description", value = "WORKER_DESC", required = false, dataType = "String"),
+ @ApiImplicitParam(name = "otherParamsJson", value = "WORKER_PARMS_JSON", required = false, dataType = "String"),
})
@PostMapping()
@ResponseStatus(HttpStatus.OK)
@@ -84,8 +86,11 @@ public class WorkerGroupController extends BaseController {
public Result saveWorkerGroup(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "id", required = false, defaultValue = "0") int id,
@RequestParam(value = "name") String name,
- @RequestParam(value = "addrList") String addrList) {
- Map<String, Object> result = workerGroupService.saveWorkerGroup(loginUser, id, name, addrList);
+ @RequestParam(value = "addrList") String addrList,
+ @RequestParam(value = "description",required = false, defaultValue = "") String description,
+ @RequestParam(value = "otherParamsJson",required = false, defaultValue = "") String otherParamsJson
+ ) {
+ Map<String, Object> result = workerGroupService.saveWorkerGroup(loginUser, id, name, addrList, description, otherParamsJson);
return returnDataList(result);
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java
index 2de4134db5..4c474b75cd 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java
@@ -34,9 +34,11 @@ public interface WorkerGroupService {
* @param id worker group id
* @param name worker group name
* @param addrList addr list
+ * @param description description
+ * @param otherParamsJson otherParamsJson
* @return create or update result code
*/
- Map<String, Object> saveWorkerGroup(User loginUser, int id, String name, String addrList);
+ Map<String, Object> saveWorkerGroup(User loginUser, int id, String name, String addrList, String description, String otherParamsJson);
/**
* query worker group paging
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
index d582549144..eee031942c 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
@@ -85,7 +85,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
*/
@Override
@Transactional
- public Map<String, Object> saveWorkerGroup(User loginUser, int id, String name, String addrList) {
+ public Map<String, Object> saveWorkerGroup(User loginUser, int id, String name, String addrList, String description, String otherParamsJson) {
Map<String, Object> result = new HashMap<>();
if (!canOperatorPermissions(loginUser,null, AuthorizationType.WORKER_GROUP, WORKER_GROUP_CREATE)) {
putMsg(result, Status.USER_NO_OPERATION_PERM);
@@ -111,6 +111,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
workerGroup.setName(name);
workerGroup.setAddrList(addrList);
workerGroup.setUpdateTime(now);
+ workerGroup.setDescription(description);
if (checkWorkerGroupNameExists(workerGroup)) {
putMsg(result, Status.NAME_EXIST, workerGroup.getName());
@@ -121,14 +122,18 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
putMsg(result, Status.WORKER_ADDRESS_INVALID, invalidAddr);
return result;
}
+ handleDefaultWorkGroup(workerGroupMapper, workerGroup, loginUser, otherParamsJson);
+ putMsg(result, Status.SUCCESS);
+ return result;
+ }
+
+ protected void handleDefaultWorkGroup(WorkerGroupMapper workerGroupMapper, WorkerGroup workerGroup, User loginUser, String otherParamsJson) {
if (workerGroup.getId() != 0) {
workerGroupMapper.updateById(workerGroup);
} else {
workerGroupMapper.insert(workerGroup);
permissionPostHandle(AuthorizationType.WORKER_GROUP, loginUser.getId(), Collections.singletonList(workerGroup.getId()),logger);
}
- putMsg(result, Status.SUCCESS);
- return result;
}
/**
@@ -275,6 +280,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
} else {
workerGroups = workerGroupMapper.queryAllWorkerGroup();
}
+
// worker groups from zookeeper
String workerPath = Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
Collection<String> workerGroupList = null;
@@ -292,7 +298,10 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
}
return workerGroups;
}
-
+ Map<String, WorkerGroup> workerGroupsMap = null;
+ if (workerGroups.size() != 0) {
+ workerGroupsMap = workerGroups.stream().collect(Collectors.toMap(WorkerGroup::getName, workerGroupItem -> workerGroupItem, (oldWorkerGroup, newWorkerGroup) -> oldWorkerGroup));
+ }
for (String workerGroup : workerGroupList) {
String workerGroupPath = workerPath + Constants.SINGLE_SLASH + workerGroup;
Collection<String> childrenNodes = null;
@@ -305,19 +314,27 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
continue;
}
WorkerGroup wg = new WorkerGroup();
+ handleAddrList(wg, workerGroup, childrenNodes);
wg.setName(workerGroup);
if (isPaging) {
- wg.setAddrList(String.join(Constants.COMMA, childrenNodes));
String registeredValue = registryClient.get(workerGroupPath + Constants.SINGLE_SLASH + childrenNodes.iterator().next());
HeartBeat heartBeat = HeartBeat.decodeHeartBeat(registeredValue);
wg.setCreateTime(new Date(heartBeat.getStartupTime()));
wg.setUpdateTime(new Date(heartBeat.getReportTime()));
wg.setSystemDefault(true);
+ if (workerGroupsMap != null && workerGroupsMap.containsKey(workerGroup)) {
+ wg.setDescription(workerGroupsMap.get(workerGroup).getDescription());
+ workerGroups.remove(workerGroupsMap.get(workerGroup));
+ }
}
workerGroups.add(wg);
}
return workerGroups;
}
+
+ protected void handleAddrList(WorkerGroup wg, String workerGroup, Collection<String> childrenNodes) {
+ wg.setAddrList(String.join(Constants.COMMA, childrenNodes));
+ }
/**
* delete worker group by id
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java
index 0f639d3fa7..72bad1e686 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java
@@ -72,6 +72,8 @@ public class WorkerGroupControllerTest extends AbstractControllerTest {
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
paramsMap.add("name","cxc_work_group");
paramsMap.add("addrList","192.168.0.1,192.168.0.2");
+ paramsMap.add("description","");
+ paramsMap.add("otherParamsJson","");
MvcResult mvcResult = mockMvc.perform(post("/worker-groups")
.header("sessionId", sessionId)
.params(paramsMap))
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/ResultTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/ResultTest.java
index 01fb75cdf7..6301a44468 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/ResultTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/ResultTest.java
@@ -16,11 +16,15 @@
*/
package org.apache.dolphinscheduler.api.utils;
+import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.junit.Assert;
import org.junit.Test;
import java.util.HashMap;
+import java.util.Map;
import static org.junit.Assert.*;
@@ -45,4 +49,4 @@ public class ResultTest {
Result ret = Result.errorWithArgs(Status.INTERNAL_SERVER_ERROR_ARGS, "test internal server error");
Assert.assertEquals(Status.INTERNAL_SERVER_ERROR_ARGS.getCode(), ret.getCode().intValue());
}
-}
\ No newline at end of file
+}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkerGroup.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkerGroup.java
index cc19dd5649..2fdd711549 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkerGroup.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkerGroup.java
@@ -23,11 +23,13 @@ import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
+import lombok.Data;
/**
* worker group
*/
@TableName("t_ds_worker_group")
+@Data
public class WorkerGroup {
@TableId(value = "id", type = IdType.AUTO)
@@ -41,67 +43,11 @@ public class WorkerGroup {
private Date updateTime;
+ private String description;
+
@TableField(exist = false)
private boolean systemDefault;
- public int getId() {
- return id;
- }
-
- public void setId(int id) {
- this.id = id;
- }
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public String getAddrList() {
- return addrList;
- }
-
- public void setAddrList(String addrList) {
- this.addrList = addrList;
- }
-
- public Date getCreateTime() {
- return createTime;
- }
-
- public void setCreateTime(Date createTime) {
- this.createTime = createTime;
- }
-
- public Date getUpdateTime() {
- return updateTime;
- }
-
- public void setUpdateTime(Date updateTime) {
- this.updateTime = updateTime;
- }
-
- public boolean getSystemDefault() {
- return systemDefault;
- }
-
- public void setSystemDefault(boolean systemDefault) {
- this.systemDefault = systemDefault;
- }
-
- @Override
- public String toString() {
- return "WorkerGroup{"
- + "id= " + id
- + ", name= " + name
- + ", addrList= " + addrList
- + ", createTime= " + createTime
- + ", updateTime= " + updateTime
- + ", systemDefault= " + systemDefault
- + "}";
- }
+ private String otherParamsJson;
}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java
index 36a57537b5..fcff987d02 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java
@@ -61,4 +61,5 @@ public interface WorkerGroupMapper extends BaseMapper<WorkerGroup> {
* @return worker group list
*/
List<WorkerGroup> queryWorkerGroupByName(@Param("name") String name);
+
}
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.xml
index 2665f123cf..79305d1467 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.xml
@@ -28,4 +28,5 @@
from t_ds_worker_group
where name = #{name}
</select>
+
</mapper>
diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
index 5211976ca0..be90dda167 100644
--- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
+++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
@@ -962,6 +962,8 @@ CREATE TABLE t_ds_worker_group
addr_list text NULL DEFAULT NULL,
create_time datetime NULL DEFAULT NULL,
update_time datetime NULL DEFAULT NULL,
+ description text NULL DEFAULT NULL,
+ other_params_json text NULL DEFAULT NULL,
PRIMARY KEY (id),
UNIQUE KEY name_unique (name)
);
diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
index ebddafc793..c877fb6fef 100644
--- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
+++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
@@ -955,6 +955,8 @@ CREATE TABLE `t_ds_worker_group` (
`addr_list` text NULL DEFAULT NULL COMMENT 'worker addr list. split by [,]',
`create_time` datetime NULL DEFAULT NULL COMMENT 'create time',
`update_time` datetime NULL DEFAULT NULL COMMENT 'update time',
+ `description` text NULL DEFAULT NULL COMMENT 'description',
+ `other_params_json` text NULL DEFAULT NULL COMMENT 'other params json',
PRIMARY KEY (`id`),
UNIQUE KEY `name_unique` (`name`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
index e4c0923bbb..f7cc1bf121 100644
--- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
+++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
@@ -862,6 +862,8 @@ CREATE TABLE t_ds_worker_group (
addr_list text DEFAULT NULL ,
create_time timestamp DEFAULT NULL ,
update_time timestamp DEFAULT NULL ,
+ description text DEFAULT NULL,
+ other_params_json text DEFAULT NULL,
PRIMARY KEY (id) ,
CONSTRAINT name_unique UNIQUE (name)
) ;
diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/mysql/dolphinscheduler_ddl.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/mysql/dolphinscheduler_ddl.sql
new file mode 100644
index 0000000000..3d0ad0197f
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/mysql/dolphinscheduler_ddl.sql
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+
+ALTER TABLE `t_ds_worker_group` ADD COLUMN `other_params_json` text DEFAULT NULL COMMENT 'other params json';
+
diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/mysql/dolphinscheduler_dml.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/mysql/dolphinscheduler_dml.sql
new file mode 100644
index 0000000000..4a14f326b9
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/mysql/dolphinscheduler_dml.sql
@@ -0,0 +1,16 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/postgresql/dolphinscheduler_ddl.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/postgresql/dolphinscheduler_ddl.sql
new file mode 100644
index 0000000000..372b99be4c
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/postgresql/dolphinscheduler_ddl.sql
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+delimiter d//
+
+
+
+CREATE OR REPLACE FUNCTION public.dolphin_update_metadata(
+ )
+ RETURNS character varying
+ LANGUAGE 'plpgsql'
+ COST 100
+ VOLATILE PARALLEL UNSAFE
+AS $BODY$
+DECLARE
+v_schema varchar;
+BEGIN
+ ---get schema name
+ v_schema =current_schema();
+
+
+
+--- add column
+EXECUTE 'ALTER TABLE ' || quote_ident(v_schema) ||'.t_ds_worker_group ADD COLUMN IF NOT EXISTS other_params_json int DEFAULT NULL ';
+
+
+
+return 'Success!';
+exception when others then
+ ---Raise EXCEPTION '(%)',SQLERRM;
+
+ return SQLERRM;
+END;
+$BODY$;
+
+select dolphin_update_metadata();
+
+
+d//
+
diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/postgresql/dolphinscheduler_dml.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/postgresql/dolphinscheduler_dml.sql
new file mode 100644
index 0000000000..4d7327f767
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/postgresql/dolphinscheduler_dml.sql
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+delimiter d//
+
+return 'Success!';
+exception when others then
+ ---Raise EXCEPTION '(%)',SQLERRM;
+ return SQLERRM;
+END;
+$BODY$;
+
+select dolphin_insert_dq_initial_data();
+
+d//
+
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
index b92cb61473..58df904fc5 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
@@ -200,23 +200,16 @@ public class ServerNodeManager implements InitializingBean {
public void run() {
try {
// sync worker node info
- Map<String, String> newWorkerNodeInfo = registryClient.getServerMaps(NodeType.WORKER, true);
- syncAllWorkerNodeInfo(newWorkerNodeInfo);
-
+ Map<String, String> registryWorkerNodeMap = registryClient.getServerMaps(NodeType.WORKER, true);
+ syncAllWorkerNodeInfo(registryWorkerNodeMap);
// sync worker group nodes from database
List<WorkerGroup> workerGroupList = workerGroupMapper.queryAllWorkerGroup();
if (CollectionUtils.isNotEmpty(workerGroupList)) {
for (WorkerGroup wg : workerGroupList) {
- String workerGroup = wg.getName();
- Set<String> nodes = new HashSet<>();
- String[] addrs = wg.getAddrList().split(Constants.COMMA);
- for (String addr : addrs) {
- if (newWorkerNodeInfo.containsKey(addr)) {
- nodes.add(addr);
- }
- }
- if (!nodes.isEmpty()) {
- syncWorkerGroupNodes(workerGroup, nodes);
+ String workerGroupName = wg.getName();
+ Set<String> workerAddress = getWorkerAddressByWorkerGroup(registryWorkerNodeMap, wg);
+ if (!workerAddress.isEmpty()) {
+ syncWorkerGroupNodes(workerGroupName, workerAddress);
}
}
}
@@ -227,6 +220,18 @@ public class ServerNodeManager implements InitializingBean {
}
}
+
+ protected Set<String> getWorkerAddressByWorkerGroup(Map<String, String> newWorkerNodeInfo, WorkerGroup wg) {
+ Set<String> nodes = new HashSet<>();
+ String[] addrs = wg.getAddrList().split(Constants.COMMA);
+ for (String addr : addrs) {
+ if (newWorkerNodeInfo.containsKey(addr)) {
+ nodes.add(addr);
+ }
+ }
+ return nodes;
+ }
+
/**
* worker group node listener
*/