You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/10/16 09:04:19 UTC

[inlong] branch master updated: [INLONG-6086][Manager] Support updating and deleting stream sink by key (#6087)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 0cd2e2657 [INLONG-6086][Manager] Support updating and deleting stream sink by key (#6087)
0cd2e2657 is described below

commit 0cd2e26572cd8f6dc2703aefc415bf9df490317f
Author: vernedeng <de...@pku.edu.cn>
AuthorDate: Sun Oct 16 17:04:13 2022 +0800

    [INLONG-6086][Manager] Support updating and deleting stream sink by key (#6087)
---
 .../manager/client/api/service/StreamSinkApi.java  | 10 +++
 .../manager/service/sink/StreamSinkService.java    | 19 +++++
 .../service/sink/StreamSinkServiceImpl.java        | 87 ++++++++++++++++++++++
 .../manager/service/sink/HiveSinkServiceTest.java  | 26 +++++++
 .../web/controller/StreamSinkController.java       | 27 +++++++
 5 files changed, 169 insertions(+)

diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java
index 9324c85f1..505073dd6 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.client.api.service;
 
 import org.apache.inlong.manager.pojo.common.PageResult;
 import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.pojo.common.UpdateResult;
 import org.apache.inlong.manager.pojo.sink.SinkRequest;
 import org.apache.inlong.manager.pojo.sink.StreamSink;
 import retrofit2.Call;
@@ -37,9 +38,18 @@ public interface StreamSinkApi {
     @POST("sink/update")
     Call<Response<Boolean>> updateSink(@Body SinkRequest request);
 
+    @POST("sink/updateByKey")
+    Call<Response<UpdateResult>> updateSinkByKey(@Body SinkRequest request);
+
     @DELETE("sink/delete/{id}")
     Call<Response<Boolean>> deleteSink(@Path("id") Integer id);
 
+    @DELETE("sink/deleteByKey")
+    Call<Response<Boolean>> deleteSink(
+            @Query("groupId") String groupId,
+            @Query("streamId") String streamId,
+            @Query("name") String name);
+
     @GET("sink/list")
     Call<Response<PageResult<StreamSink>>> listSinks(@Query("inlongGroupId") String groupId,
             @Query("inlongStreamId") String streamId, @Query("sinkType") String sinkType);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
index 6b09fd551..5ce554c85 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.manager.service.sink;
 
 import org.apache.inlong.manager.pojo.common.PageResult;
+import org.apache.inlong.manager.pojo.common.UpdateResult;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.sink.SinkApproveDTO;
 import org.apache.inlong.manager.pojo.sink.SinkBriefInfo;
@@ -104,6 +105,15 @@ public interface StreamSinkService {
      */
     Boolean update(SinkRequest sinkRequest, String operator);
 
+    /**
+     * Modify data sink information by key.
+     *
+     * @param sinkRequest Information that needs to be modified.
+     * @param operator Operator's name.
+     * @return Update result.
+     */
+    UpdateResult updateByKey(SinkRequest sinkRequest, String operator);
+
     /**
      * Modify sink data status.
      *
@@ -122,6 +132,15 @@ public interface StreamSinkService {
      */
     Boolean delete(Integer id, String operator);
 
+    /**
+     * Delete the stream sink by given group id, stream id, and sink name.
+     * @param groupId The group id of sink
+     * @param streamId The stream id of sink
+     * @param name The name of sink
+     * @return Whether succeed
+     */
+    Boolean deleteByKey(String groupId, String streamId, String name, String operator);
+
     /**
      * Logically delete stream sink with the given conditions.
      *
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index e2572651d..4094cf5c8 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -37,6 +37,7 @@ import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
 import org.apache.inlong.manager.pojo.common.OrderFieldEnum;
 import org.apache.inlong.manager.pojo.common.OrderTypeEnum;
 import org.apache.inlong.manager.pojo.common.PageResult;
+import org.apache.inlong.manager.pojo.common.UpdateResult;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.sink.SinkApproveDTO;
 import org.apache.inlong.manager.pojo.sink.SinkBriefInfo;
@@ -270,6 +271,60 @@ public class StreamSinkServiceImpl implements StreamSinkService {
         return true;
     }
 
+    @Override
+    @Transactional(rollbackFor = Throwable.class)
+    public UpdateResult updateByKey(SinkRequest request, String operator) {
+        LOGGER.info("begin to update sink info: {}", request);
+        this.checkParams(request);
+        // Check if it can be modified
+        String groupId = request.getInlongGroupId();
+        String streamId = request.getInlongStreamId();
+        String sinkName = request.getSinkName();
+        groupCheckService.checkGroupStatus(groupId, operator);
+
+        // Check whether the stream exist or not
+        InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, streamId);
+        Preconditions.checkNotNull(streamEntity, ErrorCodeEnum.STREAM_NOT_FOUND.getMessage());
+
+        // Check whether the sink name exists with the same groupId and streamId, and only one row
+        List<StreamSinkEntity> sinkList = sinkMapper.selectByRelatedId(groupId, streamId, sinkName);
+        if (CollectionUtils.isEmpty(sinkList)) {
+            String errMsg = String.format("can not find stream sink with group=%s, stream=%s, sinkName=%s",
+                    groupId, streamId, sinkName);
+            LOGGER.error(errMsg);
+            throw new BusinessException(errMsg);
+        }
+
+        if (sinkList.size() != 1) {
+            String errMsg = String.format("find %d stream sink with group=%s, stream=%s, sinkName=%s, "
+                    + "but only except 1", sinkList.size(), groupId, streamId, sinkName);
+            LOGGER.error(errMsg);
+            throw new BusinessException(errMsg);
+        }
+
+        StreamSinkEntity entity = sinkList.get(0);
+        request.setId(entity.getId());
+        SinkStatus nextStatus = null;
+        boolean streamSuccess = StreamStatus.CONFIG_SUCCESSFUL.getCode().equals(streamEntity.getStatus());
+        if (streamSuccess || StreamStatus.CONFIG_FAILED.getCode().equals(streamEntity.getStatus())) {
+            nextStatus = SinkStatus.CONFIG_ING;
+        }
+        StreamSinkOperator sinkOperator = operatorFactory.getInstance(request.getSinkType());
+        sinkOperator.updateOpt(request, nextStatus, operator);
+
+        // If the stream is [CONFIG_SUCCESSFUL], then asynchronously start the [CREATE_STREAM_RESOURCE] process
+        if (streamSuccess) {
+            // To work around the circular reference check we manually instantiate and wire
+            if (streamProcessOperation == null) {
+                streamProcessOperation = new InlongStreamProcessService();
+                autowireCapableBeanFactory.autowireBean(streamProcessOperation);
+            }
+            streamProcessOperation.startProcess(groupId, streamId, operator, false);
+        }
+        LOGGER.info("success to update sink info: {}", request);
+        return new UpdateResult(entity.getId(), true, request.getVersion() + 1);
+    }
+
     @Override
     public void updateStatus(int id, int status, String log) {
         StreamSinkEntity entity = new StreamSinkEntity();
@@ -295,6 +350,38 @@ public class StreamSinkServiceImpl implements StreamSinkService {
         return true;
     }
 
+    @Transactional(rollbackFor = Throwable.class)
+    @Override
+    public Boolean deleteByKey(String groupId, String streamId, String sinkName, String operator) {
+        LOGGER.info("begin to delete sink by group id={}, stream id={}, name={}", groupId, streamId, sinkName);
+        Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
+        Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
+        Preconditions.checkNotNull(sinkName, "stream sink name is empty or null");
+
+        // Check whether the sink name exists with the same groupId and streamId, and only one row
+        List<StreamSinkEntity> sinkList = sinkMapper.selectByRelatedId(groupId, streamId, sinkName);
+        if (CollectionUtils.isEmpty(sinkList)) {
+            String errMsg = String.format("can not find stream sink with group=%s, stream=%s, sinkName=%s",
+                    groupId, streamId, sinkName);
+            LOGGER.error(errMsg);
+            throw new BusinessException(errMsg);
+        }
+
+        if (sinkList.size() != 1) {
+            String errMsg = String.format("find %d stream sink with group=%s, stream=%s, sinkName=%s, "
+                    + "but only except 1", sinkList.size(), groupId, streamId, sinkName);
+            LOGGER.error(errMsg);
+            throw new BusinessException(errMsg);
+        }
+
+        StreamSinkEntity entity = sinkList.get(0);
+        groupCheckService.checkGroupStatus(entity.getInlongGroupId(), operator);
+        StreamSinkOperator sinkOperator = operatorFactory.getInstance(entity.getSinkType());
+        sinkOperator.deleteOpt(entity, operator);
+        LOGGER.info("success to delete sink info: {}", entity);
+        return true;
+    }
+
     @Override
     @Transactional(rollbackFor = Throwable.class)
     public Boolean logicDeleteAll(String groupId, String streamId, String operator) {
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HiveSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HiveSinkServiceTest.java
index 6197d4e81..9b1421c83 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HiveSinkServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HiveSinkServiceTest.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.service.sink;
 
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.pojo.common.UpdateResult;
 import org.apache.inlong.manager.pojo.sink.StreamSink;
 import org.apache.inlong.manager.pojo.sink.hive.HiveSink;
 import org.apache.inlong.manager.pojo.sink.hive.HiveSinkRequest;
@@ -70,6 +71,15 @@ public class HiveSinkServiceTest extends ServiceBaseTest {
         Assertions.assertTrue(result);
     }
 
+    @Test
+    public void testSaveAndDeleteByUniqueKey() {
+        Integer id = this.saveSink();
+        Assertions.assertNotNull(id);
+
+        boolean result = sinkService.deleteByKey(globalGroupId, globalStreamId, sinkName, globalOperator);
+        Assertions.assertTrue(result);
+    }
+
     @Test
     public void testListByIdentifier() {
         Integer id = this.saveSink();
@@ -95,4 +105,20 @@ public class HiveSinkServiceTest extends ServiceBaseTest {
         sinkService.delete(sinkId, globalOperator);
     }
 
+    @Test
+    public void testGetAndUpdateByUniqueKey() {
+        Integer sinkId = this.saveSink();
+        StreamSink streamSink = sinkService.get(sinkId);
+         Assertions.assertEquals(globalGroupId, streamSink.getInlongGroupId());
+
+        HiveSink sink = (HiveSink) streamSink;
+        sink.setEnableCreateResource(InlongConstants.DISABLE_CREATE_RESOURCE);
+        HiveSinkRequest request = CommonBeanUtils.copyProperties(sink, HiveSinkRequest::new);
+        UpdateResult result = sinkService.updateByKey(request, globalOperator);
+        Assertions.assertTrue(result.getSuccess());
+        Assertions.assertEquals(request.getVersion() + 1, result.getVersion().intValue());
+
+        sinkService.delete(sinkId, globalOperator);
+    }
+
 }
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
index b6a8b3de1..74cdf660c 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
@@ -19,11 +19,13 @@ package org.apache.inlong.manager.web.controller;
 
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiImplicitParam;
+import io.swagger.annotations.ApiImplicitParams;
 import io.swagger.annotations.ApiOperation;
 import org.apache.inlong.manager.common.enums.OperationType;
 import org.apache.inlong.manager.common.validation.UpdateValidation;
 import org.apache.inlong.manager.pojo.common.PageResult;
 import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.pojo.common.UpdateResult;
 import org.apache.inlong.manager.pojo.sink.SinkPageRequest;
 import org.apache.inlong.manager.pojo.sink.SinkRequest;
 import org.apache.inlong.manager.pojo.sink.StreamSink;
@@ -36,6 +38,7 @@ import org.springframework.web.bind.annotation.PathVariable;
 import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.RestController;
 
 /**
@@ -76,6 +79,13 @@ public class StreamSinkController {
         return Response.success(sinkService.update(request, LoginUserUtils.getLoginUser().getName()));
     }
 
+    @RequestMapping(value = "/sink/updateByKey", method = RequestMethod.POST)
+    @OperationLog(operation = OperationType.UPDATE)
+    @ApiOperation(value = "Update stream sink by key")
+    public Response<UpdateResult> updateByKey(@RequestBody SinkRequest request) {
+        return Response.success(sinkService.updateByKey(request, LoginUserUtils.getLoginUser().getName()));
+    }
+
     @RequestMapping(value = "/sink/delete/{id}", method = RequestMethod.DELETE)
     @OperationLog(operation = OperationType.DELETE)
     @ApiOperation(value = "Delete stream sink")
@@ -85,4 +95,21 @@ public class StreamSinkController {
         return Response.success(result);
     }
 
+    @RequestMapping(value = "/sink/deleteByKey", method = RequestMethod.DELETE)
+    @OperationLog(operation = OperationType.DELETE)
+    @ApiOperation(value = "Delete stream sink by key")
+    @ApiImplicitParams({
+            @ApiImplicitParam(name = "groupId", dataTypeClass = String.class, required = true),
+            @ApiImplicitParam(name = "streamId", dataTypeClass = String.class, required = true),
+            @ApiImplicitParam(name = "name", dataTypeClass = String.class, required = true)
+    })
+    public Response<Boolean> deleteByKey(
+            @RequestParam String groupId,
+            @RequestParam String streamId,
+            @RequestParam String name) {
+        boolean result = sinkService.deleteByKey(groupId, streamId, name,
+                LoginUserUtils.getLoginUser().getName());
+        return Response.success(result);
+    }
+
 }