You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/10/16 09:05:08 UTC
[inlong] branch master updated: [INLONG-6101][Manager] Support updating and deleting inlong cluster by key (#6102)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 7d852bf35 [INLONG-6101][Manager] Support updating and deleting inlong cluster by key (#6102)
7d852bf35 is described below
commit 7d852bf3577bb858fe56913c54c9da52401fee9e
Author: vernedeng <de...@pku.edu.cn>
AuthorDate: Sun Oct 16 17:05:04 2022 +0800
[INLONG-6101][Manager] Support updating and deleting inlong cluster by key (#6102)
---
.../api/inner/client/InlongClusterClient.java | 31 ++++++++
.../client/api/service/InlongClusterApi.java | 8 +++
.../service/cluster/InlongClusterService.java | 20 ++++++
.../service/cluster/InlongClusterServiceImpl.java | 67 +++++++++++++++++
.../service/cluster/InlongClusterServiceTest.java | 84 ++++++++++++++++++++++
.../web/controller/InlongClusterController.java | 24 +++++++
6 files changed, 234 insertions(+)
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongClusterClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongClusterClient.java
index 2312125f5..66a730915 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongClusterClient.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongClusterClient.java
@@ -32,6 +32,7 @@ import org.apache.inlong.manager.pojo.cluster.ClusterTagRequest;
import org.apache.inlong.manager.pojo.cluster.ClusterTagResponse;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.pojo.common.UpdateResult;
/**
* Client for {@link InlongClusterApi}.
@@ -161,6 +162,21 @@ public class InlongClusterClient {
return response.getData();
}
+ /**
+ * Update cluster information by unique key.
+ * Cluster name and type should not be null.
+ *
+ * @param request cluster to be modified
+ * @return whether succeed
+ */
+ public UpdateResult updateByKey(ClusterRequest request) {
+ Preconditions.checkNotNull(request.getName(), "cluster name should not be null");
+ Preconditions.checkNotNull(request.getType(), "cluster type should not be null");
+ Response<UpdateResult> response = ClientUtils.executeHttpCall(inlongClusterApi.updateByKey(request));
+ ClientUtils.assertRespSuccess(response);
+ return response.getData();
+ }
+
/**
* Bind or unbind cluster tag for clusters.
*
@@ -186,6 +202,21 @@ public class InlongClusterClient {
return response.getData();
}
+ /**
+ * Delete cluster by name and type
+ *
+ * @param name cluster name
+ * @param type cluster type
+ * @return wheter succeed
+ */
+ public Boolean deleteByKey(String name, String type) {
+ Preconditions.checkNotNull(name, "cluster name should not be empty");
+ Preconditions.checkNotNull(type, "cluster type should not be empty");
+ Response<Boolean> response = ClientUtils.executeHttpCall(inlongClusterApi.deleteByKey(name, type));
+ ClientUtils.assertRespSuccess(response);
+ return response.getData();
+ }
+
/**
* Save cluster node info.
*
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongClusterApi.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongClusterApi.java
index cab6dcf3a..fe91f556b 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongClusterApi.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongClusterApi.java
@@ -28,12 +28,14 @@ import org.apache.inlong.manager.pojo.cluster.ClusterTagRequest;
import org.apache.inlong.manager.pojo.cluster.ClusterTagResponse;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.pojo.common.UpdateResult;
import retrofit2.Call;
import retrofit2.http.Body;
import retrofit2.http.DELETE;
import retrofit2.http.GET;
import retrofit2.http.POST;
import retrofit2.http.Path;
+import retrofit2.http.Query;
public interface InlongClusterApi {
@@ -64,12 +66,18 @@ public interface InlongClusterApi {
@POST("cluster/update")
Call<Response<Boolean>> update(@Body ClusterRequest request);
+ @POST("cluster/updateByKey")
+ Call<Response<UpdateResult>> updateByKey(@Body ClusterRequest request);
+
@POST("cluster/bindTag")
Call<Response<Boolean>> bindTag(@Body BindTagRequest request);
@DELETE("cluster/delete/{id}")
Call<Response<Boolean>> delete(@Path("id") Integer id);
+ @DELETE("cluster/deleteByKey")
+ Call<Response<Boolean>> deleteByKey(@Query("name") String name, @Query("type") String type);
+
@POST("cluster/node/save")
Call<Response<Integer>> saveNode(@Body ClusterNodeRequest request);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
index 11ca3120f..890306fc2 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
@@ -29,6 +29,7 @@ import org.apache.inlong.manager.pojo.cluster.ClusterTagPageRequest;
import org.apache.inlong.manager.pojo.cluster.ClusterTagRequest;
import org.apache.inlong.manager.pojo.cluster.ClusterTagResponse;
import org.apache.inlong.manager.pojo.common.PageResult;
+import org.apache.inlong.manager.pojo.common.UpdateResult;
import java.util.List;
@@ -127,6 +128,15 @@ public interface InlongClusterService {
*/
Boolean update(ClusterRequest request, String operator);
+ /**
+ * Update cluster information by unique key
+ *
+ * @param request cluster info to be modified
+ * @param operator current operator
+ * @return Update Result
+ */
+ UpdateResult updateByKey(ClusterRequest request, String operator);
+
/**
* Bind or unbind cluster tag for clusters.
*
@@ -145,6 +155,16 @@ public interface InlongClusterService {
*/
Boolean delete(Integer id, String operator);
+ /**
+ * Delete cluster by cluster name and type
+ *
+ * @param name cluster name to be deleted
+ * @param type cluster type to be deleted
+ * @param operator current operator
+ * @return whether succeed
+ */
+ Boolean deleteByKey(String name, String type, String operator);
+
/**
* Save cluster node info.
*
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
index 90ba7c1fa..c6ec71b58 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
@@ -62,6 +62,7 @@ import org.apache.inlong.manager.pojo.cluster.ClusterTagRequest;
import org.apache.inlong.manager.pojo.cluster.ClusterTagResponse;
import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterDTO;
import org.apache.inlong.manager.pojo.common.PageResult;
+import org.apache.inlong.manager.pojo.common.UpdateResult;
import org.apache.inlong.manager.pojo.group.InlongGroupBriefInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupPageRequest;
import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
@@ -388,6 +389,38 @@ public class InlongClusterServiceImpl implements InlongClusterService {
return true;
}
+ @Override
+ public UpdateResult updateByKey(ClusterRequest request, String operator) {
+ LOGGER.debug("begin to update inlong cluster: {}", request);
+ Preconditions.checkNotNull(request, "inlong cluster info cannot be null");
+ String name = request.getName();
+ String type = request.getType();
+ Preconditions.checkNotEmpty(name, "inlong cluster name cannot be empty");
+ Preconditions.checkNotEmpty(type, "inlong cluster type cannot be empty");
+ InlongClusterEntity entity = clusterMapper.selectByNameAndType(name, type);
+ if (entity == null) {
+ LOGGER.error("inlong cluster not found by name={}, type={}", name, type);
+ throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND);
+ }
+
+ if (!Objects.equals(entity.getVersion(), request.getVersion())) {
+ String errMsg = String.format("cluster has already updated with name=%s, type=%s, curVersion=%s",
+ request.getName(), request.getType(), request.getVersion());
+ LOGGER.error(errMsg);
+ throw new BusinessException(errMsg);
+ }
+ request.setId(entity.getId());
+ UserEntity userEntity = userMapper.selectByName(operator);
+ boolean isInCharge = Preconditions.inSeparatedString(operator, entity.getInCharges(), InlongConstants.COMMA);
+ Preconditions.checkTrue(isInCharge || userEntity.getAccountType().equals(UserTypeEnum.ADMIN.getCode()),
+ "Current user does not have permission to update cluster info");
+
+ InlongClusterOperator instance = clusterOperatorFactory.getInstance(request.getType());
+ instance.updateOpt(request, operator);
+ LOGGER.info("success to update inlong cluster: {} by {}", request, operator);
+ return new UpdateResult(entity.getId(), true, request.getVersion() + 1);
+ }
+
@Override
public Boolean bindTag(BindTagRequest request, String operator) {
LOGGER.info("begin to bind or unbind cluster tag: {}", request);
@@ -426,6 +459,40 @@ public class InlongClusterServiceImpl implements InlongClusterService {
return true;
}
+ @Override
+ public Boolean deleteByKey(String name, String type, String operator) {
+ Preconditions.checkNotNull(name, "cluster name should not be empty or null");
+ Preconditions.checkNotNull(name, "cluster type should not be empty or null");
+ InlongClusterEntity entity = clusterMapper.selectByNameAndType(name, type);
+ if (entity == null || entity.getIsDeleted() > InlongConstants.UN_DELETED) {
+ LOGGER.error("inlong cluster not found by clusterName={}, type={} or was already deleted",
+ name, type);
+ return false;
+ }
+ UserEntity userEntity = userMapper.selectByName(operator);
+ boolean isInCharge = Preconditions.inSeparatedString(operator, entity.getInCharges(), InlongConstants.COMMA);
+ Preconditions.checkTrue(isInCharge || userEntity.getAccountType().equals(UserTypeEnum.ADMIN.getCode()),
+ "Current user does not have permission to delete cluster info");
+
+ List<InlongClusterNodeEntity> nodeEntities = clusterNodeMapper.selectByParentId(entity.getId(), null);
+ if (CollectionUtils.isNotEmpty(nodeEntities)) {
+ String errMsg = String.format("there are undeleted nodes under the cluster [%s], "
+ + "please delete the node first", entity.getName());
+ throw new BusinessException(errMsg);
+ }
+
+ entity.setIsDeleted(entity.getId());
+ entity.setModifier(operator);
+ if (InlongConstants.AFFECTED_ONE_ROW != clusterMapper.updateById(entity)) {
+ LOGGER.error("cluster has already updated with name={}, type={}, curVersion={}", entity.getName(),
+ entity.getType(), entity.getVersion());
+ throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
+ }
+ LOGGER.info("success to delete inlong cluster for clusterName={}, type={} by user={}",
+ name, type, operator);
+ return true;
+ }
+
@Override
public Boolean delete(Integer id, String operator) {
Preconditions.checkNotNull(id, "cluster id cannot be empty");
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
index baba6d294..f10106ee9 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
@@ -24,6 +24,7 @@ import org.apache.inlong.common.pojo.dataproxy.DataProxyNodeResponse;
import org.apache.inlong.manager.common.consts.MQType;
import org.apache.inlong.manager.common.consts.ProtocolType;
import org.apache.inlong.manager.common.enums.ClusterType;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest;
import org.apache.inlong.manager.pojo.cluster.ClusterNodeResponse;
@@ -32,6 +33,7 @@ import org.apache.inlong.manager.pojo.cluster.dataproxy.DataProxyClusterRequest;
import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterRequest;
import org.apache.inlong.manager.pojo.common.PageResult;
+import org.apache.inlong.manager.pojo.common.UpdateResult;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.service.ServiceBaseTest;
import org.apache.inlong.manager.service.core.heartbeat.HeartbeatManager;
@@ -106,6 +108,23 @@ public class InlongClusterServiceTest extends ServiceBaseTest {
return clusterService.update(request, GLOBAL_OPERATOR);
}
+ /**
+ * Update cluster by unique key.
+ */
+ public UpdateResult updatePulsarClusterByKey(
+ String name,
+ String clusterTag,
+ String adminUrl,
+ Integer version) {
+ PulsarClusterRequest request = new PulsarClusterRequest();
+ request.setName(name);
+ request.setClusterTags(clusterTag);
+ request.setAdminUrl(adminUrl);
+ request.setInCharges(GLOBAL_OPERATOR);
+ request.setVersion(version);
+ return clusterService.updateByKey(request, GLOBAL_OPERATOR);
+ }
+
/**
* Delete cluster info by id.
*/
@@ -113,6 +132,13 @@ public class InlongClusterServiceTest extends ServiceBaseTest {
return clusterService.delete(id, GLOBAL_OPERATOR);
}
+ /**
+ * Delete cluster info by name and type.
+ */
+ public Boolean deleteClusterByKey(String name, String type) {
+ return clusterService.deleteByKey(name, type, GLOBAL_OPERATOR);
+ }
+
/**
* Save cluster node info.
*/
@@ -222,6 +248,64 @@ public class InlongClusterServiceTest extends ServiceBaseTest {
Assertions.assertTrue(success);
}
+ @Test
+ public void testPulsarClusterByKey() {
+ // save cluster
+ String clusterName = "default_pulsar";
+ String clusterTag = "default_cluster";
+ String adminUrl = "http://127.0.0.1:8080";
+ Integer id = this.savePulsarCluster(clusterTag, clusterName, adminUrl);
+ Assertions.assertNotNull(id);
+
+ // list cluster
+ PageResult<ClusterInfo> listCluster = this.listCluster(ClusterType.PULSAR, clusterTag);
+ Assertions.assertTrue(listCluster.getList().size() > 0);
+ ClusterInfo clusterInfo = listCluster.getList().get(0);
+ PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
+ Assertions.assertEquals(adminUrl, pulsarCluster.getAdminUrl());
+
+ // update cluster by unique key
+ String clusterTagUpdate = "default_cluster_2";
+ String adminUrlUpdate = "http://127.0.0.1:8088";
+ UpdateResult updateResult = this.updatePulsarClusterByKey(clusterName, clusterTagUpdate, adminUrlUpdate,
+ pulsarCluster.getVersion());
+ Assertions.assertTrue(updateResult.getSuccess());
+ Assertions.assertEquals(pulsarCluster.getVersion() + 1, updateResult.getVersion());
+ ClusterInfo afterUpdate = clusterService.getOne(clusterTagUpdate, clusterName, ClusterType.PULSAR);
+ Assertions.assertNotNull(afterUpdate);
+ PulsarClusterInfo pulsarClusterAfterUpdate = (PulsarClusterInfo) afterUpdate;
+ Assertions.assertEquals(pulsarClusterAfterUpdate.getAdminUrl(), adminUrlUpdate);
+
+ // save cluster node
+ Integer parentId = id;
+ String ip = "127.0.0.1";
+ Integer port = 8080;
+ Integer nodeId = this.saveClusterNode(parentId, ClusterType.PULSAR, ip, port, ProtocolType.HTTP);
+ Assertions.assertNotNull(nodeId);
+
+ // list cluster node
+ PageResult<ClusterNodeResponse> listNode = this.listClusterNode(ClusterType.PULSAR, ip, id);
+ Assertions.assertEquals(listNode.getTotal(), 1);
+
+ // update cluster node
+ String ipUpdate = "localhost";
+ Integer portUpdate = 8083;
+ Integer version = listNode.getList().get(0).getVersion();
+ Boolean updateNodeSuccess = this.updateClusterNode(nodeId, parentId, ipUpdate, portUpdate, version);
+ Assertions.assertTrue(updateNodeSuccess);
+
+ // delete cluster node
+ Boolean deleteNodeSuccess = this.deleteClusterNode(nodeId);
+ Assertions.assertTrue(deleteNodeSuccess);
+
+ // delete cluster by unique key
+ Boolean success = this.deleteClusterByKey(clusterName, ClusterType.PULSAR);
+ Assertions.assertTrue(success);
+ Assertions.assertThrows(BusinessException.class,
+ () -> clusterService.getOne(clusterTagUpdate, clusterName, ClusterType.PULSAR));
+
+ }
+
@Test
public void testGetDataProxyIp() {
String clusterTag = "default_cluster";
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongClusterController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongClusterController.java
index 99a529593..f7752c0b7 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongClusterController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongClusterController.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.web.controller;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
+import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import org.apache.inlong.manager.common.enums.OperationType;
import org.apache.inlong.manager.common.validation.UpdateValidation;
@@ -33,6 +34,7 @@ import org.apache.inlong.manager.pojo.cluster.ClusterTagRequest;
import org.apache.inlong.manager.pojo.cluster.ClusterTagResponse;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.pojo.common.UpdateResult;
import org.apache.inlong.manager.pojo.user.UserRoleCode;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.operationlog.OperationLog;
@@ -47,6 +49,7 @@ import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
@@ -132,6 +135,14 @@ public class InlongClusterController {
return Response.success(clusterService.update(request, username));
}
+ @PostMapping(value = "/cluster/updateByKey")
+ @OperationLog(operation = OperationType.UPDATE)
+ @ApiOperation(value = "Update cluster by key")
+ public Response<UpdateResult> updateByKey(@RequestBody ClusterRequest request) {
+ String username = LoginUserUtils.getLoginUser().getName();
+ return Response.success(clusterService.updateByKey(request, username));
+ }
+
@PostMapping(value = "/cluster/bindTag")
@OperationLog(operation = OperationType.UPDATE)
@ApiOperation(value = "Bind or unbind cluster tag")
@@ -149,6 +160,19 @@ public class InlongClusterController {
return Response.success(clusterService.delete(id, LoginUserUtils.getLoginUser().getName()));
}
+ @DeleteMapping(value = "/cluster/deleteByKey")
+ @ApiOperation(value = "Delete cluster by cluster name and type")
+ @OperationLog(operation = OperationType.DELETE)
+ @ApiImplicitParams({
+ @ApiImplicitParam(name = "name", value = "Cluster name", dataTypeClass = String.class, required = true),
+ @ApiImplicitParam(name = "type", value = "Cluster type", dataTypeClass = String.class, required = true),
+ })
+ @RequiresRoles(value = UserRoleCode.ADMIN)
+ public Response<Boolean> deleteByKey(@RequestParam String name, @RequestParam String type) {
+ return Response.success(clusterService.deleteByKey(name, type,
+ LoginUserUtils.getLoginUser().getName()));
+ }
+
@PostMapping(value = "/cluster/node/save")
@ApiOperation(value = "Save cluster node")
@OperationLog(operation = OperationType.CREATE)