You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/10/16 09:05:08 UTC

[inlong] branch master updated: [INLONG-6101][Manager] Support updating and deleting inlong cluster by key (#6102)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d852bf35 [INLONG-6101][Manager] Support updating and deleting inlong cluster by key (#6102)
7d852bf35 is described below

commit 7d852bf3577bb858fe56913c54c9da52401fee9e
Author: vernedeng <de...@pku.edu.cn>
AuthorDate: Sun Oct 16 17:05:04 2022 +0800

    [INLONG-6101][Manager] Support updating and deleting inlong cluster by key (#6102)
---
 .../api/inner/client/InlongClusterClient.java      | 31 ++++++++
 .../client/api/service/InlongClusterApi.java       |  8 +++
 .../service/cluster/InlongClusterService.java      | 20 ++++++
 .../service/cluster/InlongClusterServiceImpl.java  | 67 +++++++++++++++++
 .../service/cluster/InlongClusterServiceTest.java  | 84 ++++++++++++++++++++++
 .../web/controller/InlongClusterController.java    | 24 +++++++
 6 files changed, 234 insertions(+)

diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongClusterClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongClusterClient.java
index 2312125f5..66a730915 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongClusterClient.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongClusterClient.java
@@ -32,6 +32,7 @@ import org.apache.inlong.manager.pojo.cluster.ClusterTagRequest;
 import org.apache.inlong.manager.pojo.cluster.ClusterTagResponse;
 import org.apache.inlong.manager.pojo.common.PageResult;
 import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.pojo.common.UpdateResult;
 
 /**
  * Client for {@link InlongClusterApi}.
@@ -161,6 +162,21 @@ public class InlongClusterClient {
         return response.getData();
     }
 
+    /**
+     * Update cluster information by unique key.
+     * Cluster name and type should not be null.
+     *
+     * @param request cluster to be modified
+     * @return whether succeed
+     */
+    public UpdateResult updateByKey(ClusterRequest request) {
+        Preconditions.checkNotNull(request.getName(), "cluster name should not be null");
+        Preconditions.checkNotNull(request.getType(), "cluster type should not be null");
+        Response<UpdateResult> response = ClientUtils.executeHttpCall(inlongClusterApi.updateByKey(request));
+        ClientUtils.assertRespSuccess(response);
+        return response.getData();
+    }
+
     /**
      * Bind or unbind cluster tag for clusters.
      *
@@ -186,6 +202,21 @@ public class InlongClusterClient {
         return response.getData();
     }
 
+    /**
+     * Delete cluster by name and type
+     *
+     * @param name cluster name
+     * @param type cluster type
+     * @return wheter succeed
+     */
+    public Boolean deleteByKey(String name, String type) {
+        Preconditions.checkNotNull(name, "cluster name should not be empty");
+        Preconditions.checkNotNull(type, "cluster type should not be empty");
+        Response<Boolean> response = ClientUtils.executeHttpCall(inlongClusterApi.deleteByKey(name, type));
+        ClientUtils.assertRespSuccess(response);
+        return response.getData();
+    }
+
     /**
      * Save cluster node info.
      *
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongClusterApi.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongClusterApi.java
index cab6dcf3a..fe91f556b 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongClusterApi.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongClusterApi.java
@@ -28,12 +28,14 @@ import org.apache.inlong.manager.pojo.cluster.ClusterTagRequest;
 import org.apache.inlong.manager.pojo.cluster.ClusterTagResponse;
 import org.apache.inlong.manager.pojo.common.PageResult;
 import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.pojo.common.UpdateResult;
 import retrofit2.Call;
 import retrofit2.http.Body;
 import retrofit2.http.DELETE;
 import retrofit2.http.GET;
 import retrofit2.http.POST;
 import retrofit2.http.Path;
+import retrofit2.http.Query;
 
 public interface InlongClusterApi {
 
@@ -64,12 +66,18 @@ public interface InlongClusterApi {
     @POST("cluster/update")
     Call<Response<Boolean>> update(@Body ClusterRequest request);
 
+    @POST("cluster/updateByKey")
+    Call<Response<UpdateResult>> updateByKey(@Body ClusterRequest request);
+
     @POST("cluster/bindTag")
     Call<Response<Boolean>> bindTag(@Body BindTagRequest request);
 
     @DELETE("cluster/delete/{id}")
     Call<Response<Boolean>> delete(@Path("id") Integer id);
 
+    @DELETE("cluster/deleteByKey")
+    Call<Response<Boolean>> deleteByKey(@Query("name") String name, @Query("type") String type);
+
     @POST("cluster/node/save")
     Call<Response<Integer>> saveNode(@Body ClusterNodeRequest request);
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
index 11ca3120f..890306fc2 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
@@ -29,6 +29,7 @@ import org.apache.inlong.manager.pojo.cluster.ClusterTagPageRequest;
 import org.apache.inlong.manager.pojo.cluster.ClusterTagRequest;
 import org.apache.inlong.manager.pojo.cluster.ClusterTagResponse;
 import org.apache.inlong.manager.pojo.common.PageResult;
+import org.apache.inlong.manager.pojo.common.UpdateResult;
 
 import java.util.List;
 
@@ -127,6 +128,15 @@ public interface InlongClusterService {
      */
     Boolean update(ClusterRequest request, String operator);
 
+    /**
+     * Update cluster information by unique key
+     *
+     * @param request cluster info to be modified
+     * @param operator current operator
+     * @return Update Result
+     */
+    UpdateResult updateByKey(ClusterRequest request, String operator);
+
     /**
      * Bind or unbind cluster tag for clusters.
      *
@@ -145,6 +155,16 @@ public interface InlongClusterService {
      */
     Boolean delete(Integer id, String operator);
 
+    /**
+     * Delete cluster by cluster name and type
+     *
+     * @param name cluster name to be deleted
+     * @param type cluster type to be deleted
+     * @param operator current operator
+     * @return whether succeed
+     */
+    Boolean deleteByKey(String name, String type, String operator);
+
     /**
      * Save cluster node info.
      *
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
index 90ba7c1fa..c6ec71b58 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
@@ -62,6 +62,7 @@ import org.apache.inlong.manager.pojo.cluster.ClusterTagRequest;
 import org.apache.inlong.manager.pojo.cluster.ClusterTagResponse;
 import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterDTO;
 import org.apache.inlong.manager.pojo.common.PageResult;
+import org.apache.inlong.manager.pojo.common.UpdateResult;
 import org.apache.inlong.manager.pojo.group.InlongGroupBriefInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupPageRequest;
 import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
@@ -388,6 +389,38 @@ public class InlongClusterServiceImpl implements InlongClusterService {
         return true;
     }
 
+    @Override
+    public UpdateResult updateByKey(ClusterRequest request, String operator) {
+        LOGGER.debug("begin to update inlong cluster: {}", request);
+        Preconditions.checkNotNull(request, "inlong cluster info cannot be null");
+        String name = request.getName();
+        String type = request.getType();
+        Preconditions.checkNotEmpty(name, "inlong cluster name cannot be empty");
+        Preconditions.checkNotEmpty(type, "inlong cluster type cannot be empty");
+        InlongClusterEntity entity = clusterMapper.selectByNameAndType(name, type);
+        if (entity == null) {
+            LOGGER.error("inlong cluster not found by name={}, type={}", name, type);
+            throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND);
+        }
+
+        if (!Objects.equals(entity.getVersion(), request.getVersion())) {
+            String errMsg = String.format("cluster has already updated with name=%s, type=%s, curVersion=%s",
+                    request.getName(), request.getType(), request.getVersion());
+            LOGGER.error(errMsg);
+            throw new BusinessException(errMsg);
+        }
+        request.setId(entity.getId());
+        UserEntity userEntity = userMapper.selectByName(operator);
+        boolean isInCharge = Preconditions.inSeparatedString(operator, entity.getInCharges(), InlongConstants.COMMA);
+        Preconditions.checkTrue(isInCharge || userEntity.getAccountType().equals(UserTypeEnum.ADMIN.getCode()),
+                "Current user does not have permission to update cluster info");
+
+        InlongClusterOperator instance = clusterOperatorFactory.getInstance(request.getType());
+        instance.updateOpt(request, operator);
+        LOGGER.info("success to update inlong cluster: {} by {}", request, operator);
+        return new UpdateResult(entity.getId(), true, request.getVersion() + 1);
+    }
+
     @Override
     public Boolean bindTag(BindTagRequest request, String operator) {
         LOGGER.info("begin to bind or unbind cluster tag: {}", request);
@@ -426,6 +459,40 @@ public class InlongClusterServiceImpl implements InlongClusterService {
         return true;
     }
 
+    @Override
+    public Boolean deleteByKey(String name, String type, String operator) {
+        Preconditions.checkNotNull(name, "cluster name should not be empty or null");
+        Preconditions.checkNotNull(name, "cluster type should not be empty or null");
+        InlongClusterEntity entity = clusterMapper.selectByNameAndType(name, type);
+        if (entity == null || entity.getIsDeleted() > InlongConstants.UN_DELETED) {
+            LOGGER.error("inlong cluster not found by clusterName={}, type={} or was already deleted",
+                    name, type);
+            return false;
+        }
+        UserEntity userEntity = userMapper.selectByName(operator);
+        boolean isInCharge = Preconditions.inSeparatedString(operator, entity.getInCharges(), InlongConstants.COMMA);
+        Preconditions.checkTrue(isInCharge || userEntity.getAccountType().equals(UserTypeEnum.ADMIN.getCode()),
+                "Current user does not have permission to delete cluster info");
+
+        List<InlongClusterNodeEntity> nodeEntities = clusterNodeMapper.selectByParentId(entity.getId(), null);
+        if (CollectionUtils.isNotEmpty(nodeEntities)) {
+            String errMsg = String.format("there are undeleted nodes under the cluster [%s], "
+                    + "please delete the node first", entity.getName());
+            throw new BusinessException(errMsg);
+        }
+
+        entity.setIsDeleted(entity.getId());
+        entity.setModifier(operator);
+        if (InlongConstants.AFFECTED_ONE_ROW != clusterMapper.updateById(entity)) {
+            LOGGER.error("cluster has already updated with name={}, type={}, curVersion={}", entity.getName(),
+                    entity.getType(), entity.getVersion());
+            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
+        }
+        LOGGER.info("success to delete inlong cluster for clusterName={}, type={} by user={}",
+                name, type, operator);
+        return true;
+    }
+
     @Override
     public Boolean delete(Integer id, String operator) {
         Preconditions.checkNotNull(id, "cluster id cannot be empty");
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
index baba6d294..f10106ee9 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
@@ -24,6 +24,7 @@ import org.apache.inlong.common.pojo.dataproxy.DataProxyNodeResponse;
 import org.apache.inlong.manager.common.consts.MQType;
 import org.apache.inlong.manager.common.consts.ProtocolType;
 import org.apache.inlong.manager.common.enums.ClusterType;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
 import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest;
 import org.apache.inlong.manager.pojo.cluster.ClusterNodeResponse;
@@ -32,6 +33,7 @@ import org.apache.inlong.manager.pojo.cluster.dataproxy.DataProxyClusterRequest;
 import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
 import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterRequest;
 import org.apache.inlong.manager.pojo.common.PageResult;
+import org.apache.inlong.manager.pojo.common.UpdateResult;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.service.ServiceBaseTest;
 import org.apache.inlong.manager.service.core.heartbeat.HeartbeatManager;
@@ -106,6 +108,23 @@ public class InlongClusterServiceTest extends ServiceBaseTest {
         return clusterService.update(request, GLOBAL_OPERATOR);
     }
 
+    /**
+     * Update cluster by unique key.
+     */
+    public UpdateResult updatePulsarClusterByKey(
+            String name,
+            String clusterTag,
+            String adminUrl,
+            Integer version) {
+        PulsarClusterRequest request = new PulsarClusterRequest();
+        request.setName(name);
+        request.setClusterTags(clusterTag);
+        request.setAdminUrl(adminUrl);
+        request.setInCharges(GLOBAL_OPERATOR);
+        request.setVersion(version);
+        return clusterService.updateByKey(request, GLOBAL_OPERATOR);
+    }
+
     /**
      * Delete cluster info by id.
      */
@@ -113,6 +132,13 @@ public class InlongClusterServiceTest extends ServiceBaseTest {
         return clusterService.delete(id, GLOBAL_OPERATOR);
     }
 
+    /**
+     * Delete cluster info by name and type.
+     */
+    public Boolean deleteClusterByKey(String name, String type) {
+        return clusterService.deleteByKey(name, type, GLOBAL_OPERATOR);
+    }
+
     /**
      * Save cluster node info.
      */
@@ -222,6 +248,64 @@ public class InlongClusterServiceTest extends ServiceBaseTest {
         Assertions.assertTrue(success);
     }
 
+    @Test
+    public void testPulsarClusterByKey() {
+        // save cluster
+        String clusterName = "default_pulsar";
+        String clusterTag = "default_cluster";
+        String adminUrl = "http://127.0.0.1:8080";
+        Integer id = this.savePulsarCluster(clusterTag, clusterName, adminUrl);
+        Assertions.assertNotNull(id);
+
+        // list cluster
+        PageResult<ClusterInfo> listCluster = this.listCluster(ClusterType.PULSAR, clusterTag);
+        Assertions.assertTrue(listCluster.getList().size() > 0);
+        ClusterInfo clusterInfo = listCluster.getList().get(0);
+        PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
+        Assertions.assertEquals(adminUrl, pulsarCluster.getAdminUrl());
+
+        // update cluster by unique key
+        String clusterTagUpdate = "default_cluster_2";
+        String adminUrlUpdate = "http://127.0.0.1:8088";
+        UpdateResult updateResult = this.updatePulsarClusterByKey(clusterName, clusterTagUpdate, adminUrlUpdate,
+                pulsarCluster.getVersion());
+        Assertions.assertTrue(updateResult.getSuccess());
+        Assertions.assertEquals(pulsarCluster.getVersion() + 1, updateResult.getVersion());
+        ClusterInfo afterUpdate = clusterService.getOne(clusterTagUpdate, clusterName, ClusterType.PULSAR);
+        Assertions.assertNotNull(afterUpdate);
+        PulsarClusterInfo pulsarClusterAfterUpdate = (PulsarClusterInfo) afterUpdate;
+        Assertions.assertEquals(pulsarClusterAfterUpdate.getAdminUrl(), adminUrlUpdate);
+
+        // save cluster node
+        Integer parentId = id;
+        String ip = "127.0.0.1";
+        Integer port = 8080;
+        Integer nodeId = this.saveClusterNode(parentId, ClusterType.PULSAR, ip, port, ProtocolType.HTTP);
+        Assertions.assertNotNull(nodeId);
+
+        // list cluster node
+        PageResult<ClusterNodeResponse> listNode = this.listClusterNode(ClusterType.PULSAR, ip, id);
+        Assertions.assertEquals(listNode.getTotal(), 1);
+
+        // update cluster node
+        String ipUpdate = "localhost";
+        Integer portUpdate = 8083;
+        Integer version = listNode.getList().get(0).getVersion();
+        Boolean updateNodeSuccess = this.updateClusterNode(nodeId, parentId, ipUpdate, portUpdate, version);
+        Assertions.assertTrue(updateNodeSuccess);
+
+        // delete cluster node
+        Boolean deleteNodeSuccess = this.deleteClusterNode(nodeId);
+        Assertions.assertTrue(deleteNodeSuccess);
+
+        // delete cluster by unique key
+        Boolean success = this.deleteClusterByKey(clusterName, ClusterType.PULSAR);
+        Assertions.assertTrue(success);
+        Assertions.assertThrows(BusinessException.class,
+                () -> clusterService.getOne(clusterTagUpdate, clusterName, ClusterType.PULSAR));
+
+    }
+
     @Test
     public void testGetDataProxyIp() {
         String clusterTag = "default_cluster";
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongClusterController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongClusterController.java
index 99a529593..f7752c0b7 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongClusterController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongClusterController.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.web.controller;
 
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiImplicitParam;
+import io.swagger.annotations.ApiImplicitParams;
 import io.swagger.annotations.ApiOperation;
 import org.apache.inlong.manager.common.enums.OperationType;
 import org.apache.inlong.manager.common.validation.UpdateValidation;
@@ -33,6 +34,7 @@ import org.apache.inlong.manager.pojo.cluster.ClusterTagRequest;
 import org.apache.inlong.manager.pojo.cluster.ClusterTagResponse;
 import org.apache.inlong.manager.pojo.common.PageResult;
 import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.pojo.common.UpdateResult;
 import org.apache.inlong.manager.pojo.user.UserRoleCode;
 import org.apache.inlong.manager.service.cluster.InlongClusterService;
 import org.apache.inlong.manager.service.operationlog.OperationLog;
@@ -47,6 +49,7 @@ import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.RestController;
 
 /**
@@ -132,6 +135,14 @@ public class InlongClusterController {
         return Response.success(clusterService.update(request, username));
     }
 
+    @PostMapping(value = "/cluster/updateByKey")
+    @OperationLog(operation = OperationType.UPDATE)
+    @ApiOperation(value = "Update cluster by key")
+    public Response<UpdateResult> updateByKey(@RequestBody ClusterRequest request) {
+        String username = LoginUserUtils.getLoginUser().getName();
+        return Response.success(clusterService.updateByKey(request, username));
+    }
+
     @PostMapping(value = "/cluster/bindTag")
     @OperationLog(operation = OperationType.UPDATE)
     @ApiOperation(value = "Bind or unbind cluster tag")
@@ -149,6 +160,19 @@ public class InlongClusterController {
         return Response.success(clusterService.delete(id, LoginUserUtils.getLoginUser().getName()));
     }
 
+    @DeleteMapping(value = "/cluster/deleteByKey")
+    @ApiOperation(value = "Delete cluster by cluster name and type")
+    @OperationLog(operation = OperationType.DELETE)
+    @ApiImplicitParams({
+            @ApiImplicitParam(name = "name", value = "Cluster name", dataTypeClass = String.class, required = true),
+            @ApiImplicitParam(name = "type", value = "Cluster type", dataTypeClass = String.class, required = true),
+    })
+    @RequiresRoles(value = UserRoleCode.ADMIN)
+    public Response<Boolean> deleteByKey(@RequestParam String name, @RequestParam String type) {
+        return Response.success(clusterService.deleteByKey(name, type,
+                LoginUserUtils.getLoginUser().getName()));
+    }
+
     @PostMapping(value = "/cluster/node/save")
     @ApiOperation(value = "Save cluster node")
     @OperationLog(operation = OperationType.CREATE)