You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/10/17 08:55:23 UTC

[inlong] branch master updated: [INLONG-6192][Manager] Clean and reuse code for StreamSink (#6193)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new c9c120f6d [INLONG-6192][Manager] Clean and reuse code for StreamSink (#6193)
c9c120f6d is described below

commit c9c120f6dd68c1184d8f81ecb54a3547a0baffb1
Author: healchow <he...@gmail.com>
AuthorDate: Mon Oct 17 16:55:18 2022 +0800

    [INLONG-6192][Manager] Clean and reuse code for StreamSink (#6193)
---
 .../client/api/inner/client/StreamSinkClient.java  |  10 +-
 .../manager/client/api/service/StreamSinkApi.java  |  20 ++--
 .../manager/dao/mapper/StreamSinkEntityMapper.java |  19 +++-
 .../resources/mappers/StreamSinkEntityMapper.xml   |  12 +-
 .../service/core/impl/AuditServiceImpl.java        |   2 +-
 .../manager/service/sink/StreamSinkService.java    | 124 +++++++++++----------
 .../service/sink/StreamSinkServiceImpl.java        | 122 +++++++-------------
 .../manager/service/sink/HiveSinkServiceTest.java  |   8 +-
 .../web/controller/StreamSinkController.java       |  10 +-
 9 files changed, 148 insertions(+), 179 deletions(-)

diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSinkClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSinkClient.java
index c902c2e96..3b6764eb5 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSinkClient.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSinkClient.java
@@ -41,7 +41,7 @@ public class StreamSinkClient {
     }
 
     public Integer createSink(SinkRequest sinkRequest) {
-        Response<Integer> response = ClientUtils.executeHttpCall(streamSinkApi.createSink(sinkRequest));
+        Response<Integer> response = ClientUtils.executeHttpCall(streamSinkApi.save(sinkRequest));
         ClientUtils.assertRespSuccess(response);
         return response.getData();
     }
@@ -51,7 +51,7 @@ public class StreamSinkClient {
      */
     public boolean deleteSink(int id) {
         Preconditions.checkTrue(id > 0, "sinkId is illegal");
-        Response<Boolean> response = ClientUtils.executeHttpCall(streamSinkApi.deleteSink(id));
+        Response<Boolean> response = ClientUtils.executeHttpCall(streamSinkApi.deleteById(id));
         ClientUtils.assertRespSuccess(response);
         return response.getData();
     }
@@ -68,7 +68,7 @@ public class StreamSinkClient {
      */
     public List<StreamSink> listSinks(String groupId, String streamId, String sinkType) {
         Response<PageResult<StreamSink>> response = ClientUtils.executeHttpCall(
-                streamSinkApi.listSinks(groupId, streamId, sinkType));
+                streamSinkApi.list(groupId, streamId, sinkType));
         ClientUtils.assertRespSuccess(response);
         return response.getData().getList();
     }
@@ -77,7 +77,7 @@ public class StreamSinkClient {
      * Update the stream sink info.
      */
     public Pair<Boolean, String> updateSink(SinkRequest sinkRequest) {
-        Response<Boolean> responseBody = ClientUtils.executeHttpCall(streamSinkApi.updateSink(sinkRequest));
+        Response<Boolean> responseBody = ClientUtils.executeHttpCall(streamSinkApi.updateById(sinkRequest));
         ClientUtils.assertRespSuccess(responseBody);
 
         if (responseBody.getData() != null) {
@@ -91,7 +91,7 @@ public class StreamSinkClient {
      * Get detail information of data sink.
      */
     public StreamSink getSinkInfo(Integer sinkId) {
-        Response<StreamSink> response = ClientUtils.executeHttpCall(streamSinkApi.getSinkInfo(sinkId));
+        Response<StreamSink> response = ClientUtils.executeHttpCall(streamSinkApi.get(sinkId));
         ClientUtils.assertRespSuccess(response);
         return response.getData();
     }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java
index 505073dd6..323cb75f1 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java
@@ -33,28 +33,26 @@ import retrofit2.http.Query;
 public interface StreamSinkApi {
 
     @POST("sink/save")
-    Call<Response<Integer>> createSink(@Body SinkRequest request);
+    Call<Response<Integer>> save(@Body SinkRequest request);
 
     @POST("sink/update")
-    Call<Response<Boolean>> updateSink(@Body SinkRequest request);
+    Call<Response<Boolean>> updateById(@Body SinkRequest request);
 
     @POST("sink/updateByKey")
-    Call<Response<UpdateResult>> updateSinkByKey(@Body SinkRequest request);
+    Call<Response<UpdateResult>> updateByKey(@Body SinkRequest request);
 
     @DELETE("sink/delete/{id}")
-    Call<Response<Boolean>> deleteSink(@Path("id") Integer id);
+    Call<Response<Boolean>> deleteById(@Path("id") Integer id);
 
     @DELETE("sink/deleteByKey")
-    Call<Response<Boolean>> deleteSink(
-            @Query("groupId") String groupId,
-            @Query("streamId") String streamId,
+    Call<Response<Boolean>> deleteByKey(@Query("groupId") String groupId, @Query("streamId") String streamId,
             @Query("name") String name);
 
+    @GET("sink/get/{id}")
+    Call<Response<StreamSink>> get(@Path("id") Integer sinkId);
+
     @GET("sink/list")
-    Call<Response<PageResult<StreamSink>>> listSinks(@Query("inlongGroupId") String groupId,
+    Call<Response<PageResult<StreamSink>>> list(@Query("inlongGroupId") String groupId,
             @Query("inlongStreamId") String streamId, @Query("sinkType") String sinkType);
 
-    @GET("sink/get/{id}")
-    Call<Response<StreamSink>> getSinkInfo(@Path("id") Integer sinkId);
-
 }
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java
index 9999371ff..2662e9c0a 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java
@@ -64,12 +64,21 @@ public interface StreamSinkEntityMapper {
     /**
      * Query valid sink list by the given group id and stream id.
      *
-     * @param groupId Inlong group id.
-     * @param streamId Inlong stream id.
-     * @param sinkName Stream sink name.
-     * @return Sink entity list.
+     * @param groupId inlong group id
+     * @param streamId inlong stream id
+     * @return stream sink entity list
+     */
+    List<StreamSinkEntity> selectByRelatedId(@Param("groupId") String groupId, @Param("streamId") String streamId);
+
+    /**
+     * Query stream sink by the unique key.
+     *
+     * @param groupId inlong group id
+     * @param streamId inlong stream id
+     * @param sinkName stream sink name
+     * @return stream sink entity
      */
-    List<StreamSinkEntity> selectByRelatedId(@Param("groupId") String groupId, @Param("streamId") String streamId,
+    StreamSinkEntity selectByUniqueKey(@Param("groupId") String groupId, @Param("streamId") String streamId,
             @Param("sinkName") String sinkName);
 
     /**
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
index 5d1d3ea25..12e3bc6c0 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
@@ -267,11 +267,17 @@
             <if test="streamId != null and streamId != ''">
                 and inlong_stream_id = #{streamId, jdbcType=VARCHAR}
             </if>
-            <if test="sinkName != null and sinkName != ''">
-                and sink_name = #{sinkName, jdbcType=VARCHAR}
-            </if>
         </where>
     </select>
+    <select id="selectByUniqueKey" resultType="org.apache.inlong.manager.dao.entity.StreamSinkEntity">
+        select
+        <include refid="Base_Column_List"/>
+        from stream_sink
+        where is_deleted = 0
+        and inlong_group_id = #{groupId, jdbcType=VARCHAR}
+        and inlong_stream_id = #{streamId, jdbcType=VARCHAR}
+        and sink_name = #{sinkName, jdbcType=VARCHAR}
+    </select>
     <select id="selectByIdAndType" resultType="org.apache.inlong.manager.dao.entity.StreamSinkEntity">
         select
         <include refid="Base_Column_List"/>
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
index 3a5fe1480..c5061ec97 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
@@ -114,7 +114,7 @@ public class AuditServiceImpl implements AuditService {
 
         // for now, we use the first sink type only.
         // this is temporary behavior before multiple sinks in one stream is fully supported.
-        List<StreamSinkEntity> sinkEntityList = sinkEntityMapper.selectByRelatedId(groupId, streamId, null);
+        List<StreamSinkEntity> sinkEntityList = sinkEntityMapper.selectByRelatedId(groupId, streamId);
         String sinkNodeType = null;
         if (CollectionUtils.isNotEmpty(sinkEntityList)) {
             sinkNodeType = sinkEntityList.get(0).getSinkType();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
index 5ce554c85..f5c1894dd 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
@@ -27,6 +27,7 @@ import org.apache.inlong.manager.pojo.sink.SinkRequest;
 import org.apache.inlong.manager.pojo.sink.StreamSink;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 
+import javax.annotation.Nullable;
 import java.util.List;
 import java.util.Map;
 
@@ -36,37 +37,39 @@ import java.util.Map;
 public interface StreamSinkService {
 
     /**
-     * Save the sink information.
+     * Save the sink info.
      *
-     * @param request Sink request.
-     * @param operator Operator's name.
-     * @return Sink id after saving.
+     * @param request sink request need to save
+     * @param operator name of operator
+     * @return sink id after saving
      */
     Integer save(SinkRequest request, String operator);
 
     /**
-     * Query sink information based on id.
+     * Get stream sink info based on id.
      *
-     * @param id Sink id.
-     * @return Sink info.
+     * @param id sink id
+     * @return detail of stream sink info
      */
     StreamSink get(Integer id);
 
     /**
-     * Query sink information based on inlong group id and inlong stream id.
+     * List the stream sinks based on inlong group id and inlong stream id.
      *
-     * @param groupId Inlong group id.
-     * @param streamId Inlong stream id, can be null.
-     * @return Sink info list.
+     * @param groupId inlong group id
+     * @param streamId inlong stream id, can be null
+     * @return sink info list
      */
-    List<StreamSink> listSink(String groupId, String streamId);
+    List<StreamSink> listSink(String groupId, @Nullable String streamId);
 
     /**
-     * Query sink summary based on inlong group id and inlong stream id, including sink cluster.
+     * Query sink brief info based on inlong group id and inlong stream id.
+     * <p/>
+     * The result will include sink cluster info.
      *
-     * @param groupId Inlong group id.
-     * @param streamId Inlong stream id.
-     * @return Sink info list.
+     * @param groupId inlong group id
+     * @param streamId inlong stream id
+     * @return stream sink brief info list
      */
     List<SinkBriefInfo> listBrief(String groupId, String streamId);
 
@@ -80,84 +83,85 @@ public interface StreamSinkService {
     Map<String, List<StreamSink>> getSinksMap(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfos);
 
     /**
-     * Query the number of undeleted sink info based on inlong group and inlong stream id
+     * Query the number of undeleted sink info based on inlong group and inlong stream id.
      *
-     * @param groupId Inlong group id.
-     * @param streamId Inlong stream id.
-     * @return Number of sink info.
+     * @param groupId inlong group id
+     * @param streamId inlong stream id
+     * @return count of sink info
      */
     Integer getCount(String groupId, String streamId);
 
     /**
-     * Paging query sink information based on conditions.
+     * Paging query stream sink info based on conditions.
      *
-     * @param request paging request.
-     * @return sink list
+     * @param request paging request
+     * @return sink page list
      */
     PageResult<? extends StreamSink> listByCondition(SinkPageRequest request);
 
     /**
-     * Modify data sink information.
+     * Modify stream sink info by id.
      *
-     * @param sinkRequest Information that needs to be modified.
-     * @param operator Operator's name.
-     * @return Whether succeed.
+     * @param sinkRequest stream sink request that needs to be modified
+     * @param operator name of operator
+     * @return whether succeed
      */
     Boolean update(SinkRequest sinkRequest, String operator);
 
     /**
-     * Modify data sink information by key.
+     * Modify stream sink info by key.
      *
-     * @param sinkRequest Information that needs to be modified.
-     * @param operator Operator's name.
-     * @return Update result.
+     * @param sinkRequest stream sink request that needs to be modified
+     * @param operator name of operator
+     * @return update result
      */
     UpdateResult updateByKey(SinkRequest sinkRequest, String operator);
 
     /**
-     * Modify sink data status.
+     * Modify stream sink status.
      *
-     * @param id Sink id.
-     * @param status Target status.
-     * @param log Modify the log.
+     * @param id stream sink id
+     * @param status target status
+     * @param log log info of this modification
      */
-    void updateStatus(int id, int status, String log);
+    void updateStatus(Integer id, int status, String log);
 
     /**
      * Delete the stream sink by the given id and sink type.
      *
-     * @param id The primary key of the sink.
-     * @param operator Operator's name.
-     * @return Whether succeed
+     * @param id stream sink id
+     * @param operator name of operator
+     * @return whether succeed
      */
     Boolean delete(Integer id, String operator);
 
     /**
      * Delete the stream sink by given group id, stream id, and sink name.
-     * @param groupId The group id of sink
-     * @param streamId The stream id of sink
-     * @param name The name of sink
-     * @return Whether succeed
+     *
+     * @param groupId inlong group id
+     * @param streamId inlong stream id
+     * @param name stream sink name
+     * @return whether succeed
      */
     Boolean deleteByKey(String groupId, String streamId, String name, String operator);
 
     /**
      * Logically delete stream sink with the given conditions.
      *
-     * @param groupId InLong group id to which the data source belongs.
-     * @param streamId InLong stream id to which the data source belongs.
-     * @param operator Operator's name.
-     * @return Whether succeed.
+     * @param groupId inlong group id
+     * @param streamId inlong stream id
+     * @param operator name of operator
+     * @return whether succeed
      */
     Boolean logicDeleteAll(String groupId, String streamId, String operator);
 
     /**
      * Physically delete stream sink with the given conditions.
      *
-     * @param groupId InLong group id.
-     * @param streamId InLong stream id.
-     * @param operator Operator's name.
-     * @return Whether succeed.
+     * @param groupId inlong group id
+     * @param streamId inlong stream id
+     * @param operator name of operator
+     * @return whether succeed
      */
     Boolean deleteAll(String groupId, String streamId, String operator);
 
@@ -165,27 +169,27 @@ public interface StreamSinkService {
      * According to the existing inlong stream ID list, filter out the inlong stream id list
      * containing the specified sink type.
      *
-     * @param groupId Inlong group id.
-     * @param sinkType Sink type.
-     * @param streamIdList Inlong stream id list.
-     * @return List of filtered inlong stream ids.
+     * @param groupId inlong group id
+     * @param sinkType stream sink type
+     * @param streamIdList inlong stream id list
+     * @return list of filtered inlong stream ids
      */
     List<String> getExistsStreamIdList(String groupId, String sinkType, List<String> streamIdList);
 
     /**
      * According to the inlong stream id, query the list of sink types owned by it
      *
-     * @param groupId Inlong group id
-     * @param streamId Inlong stream id
-     * @return List of sink types
+     * @param groupId inlong group id
+     * @param streamId inlong stream id
+     * @return list of sink types
      */
     List<String> getSinkTypeList(String groupId, String streamId);
 
     /**
      * Save the information modified when the approval is passed
      *
-     * @param sinkApproveList Stream sink approval information
-     * @param operator Operator's name
+     * @param sinkApproveList stream sink approval information
+     * @param operator name of operator
      * @return whether succeed
      */
     Boolean updateAfterApprove(List<SinkApproveDTO> sinkApproveList, String operator);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index 4094cf5c8..852484869 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -60,7 +60,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.stream.Collectors;
 
 /**
@@ -97,18 +96,18 @@ public class StreamSinkServiceImpl implements StreamSinkService {
         String groupId = request.getInlongGroupId();
         groupCheckService.checkGroupStatus(groupId, operator);
 
-        // Make sure that there is no sink info with the current groupId and streamId
+        // Make sure that there is no same sink name under the current groupId and streamId
         String streamId = request.getInlongStreamId();
         String sinkName = request.getSinkName();
         // Check whether the stream exist or not
         InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, streamId);
         Preconditions.checkNotNull(streamEntity, ErrorCodeEnum.STREAM_NOT_FOUND.getMessage());
-        List<StreamSinkEntity> sinkList = sinkMapper.selectByRelatedId(groupId, streamId, sinkName);
-        for (StreamSinkEntity sinkEntity : sinkList) {
-            if (sinkEntity != null && Objects.equals(sinkEntity.getSinkName(), sinkName)) {
-                String err = "sink name=%s already exists with the groupId=%s streamId=%s";
-                throw new BusinessException(String.format(err, sinkName, groupId, streamId));
-            }
+
+        // Check whether the sink name exists with the same groupId and streamId
+        StreamSinkEntity exists = sinkMapper.selectByUniqueKey(groupId, streamId, sinkName);
+        if (exists != null && exists.getSinkName().equals(sinkName)) {
+            String err = "sink name=%s already exists with the groupId=%s streamId=%s";
+            throw new BusinessException(String.format(err, sinkName, groupId, streamId));
         }
 
         // According to the sink type, save sink information
@@ -163,7 +162,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
     @Override
     public List<StreamSink> listSink(String groupId, String streamId) {
         Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
-        List<StreamSinkEntity> entityList = sinkMapper.selectByRelatedId(groupId, streamId, null);
+        List<StreamSinkEntity> entityList = sinkMapper.selectByRelatedId(groupId, streamId);
         if (CollectionUtils.isEmpty(entityList)) {
             return Collections.emptyList();
         }
@@ -226,7 +225,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
     @Override
     @Transactional(rollbackFor = Throwable.class)
     public Boolean update(SinkRequest request, String operator) {
-        LOGGER.info("begin to update sink info: {}", request);
+        LOGGER.info("begin to update sink by id: {}", request);
         this.checkParams(request);
         Preconditions.checkNotNull(request.getId(), ErrorCodeEnum.ID_IS_EMPTY.getMessage());
 
@@ -241,13 +240,10 @@ public class StreamSinkServiceImpl implements StreamSinkService {
         Preconditions.checkNotNull(streamEntity, ErrorCodeEnum.STREAM_NOT_FOUND.getMessage());
 
         // Check whether the sink name exists with the same groupId and streamId
-        List<StreamSinkEntity> sinkList = sinkMapper.selectByRelatedId(groupId, streamId, sinkName);
-        for (StreamSinkEntity entity : sinkList) {
-            Integer sinkId = entity.getId();
-            if (!Objects.equals(request.getId(), sinkId) && Objects.equals(entity.getSinkName(), sinkName)) {
-                String err = "sink name=%s already exists with the groupId=%s streamId=%s";
-                throw new BusinessException(String.format(err, sinkName, groupId, streamId));
-            }
+        StreamSinkEntity exists = sinkMapper.selectByUniqueKey(groupId, streamId, sinkName);
+        if (exists != null && !exists.getId().equals(request.getId()) && exists.getSinkName().equals(sinkName)) {
+            String errMsg = "sink name=%s already exists with the groupId=%s streamId=%s";
+            throw new BusinessException(String.format(errMsg, sinkName, groupId, streamId));
         }
 
         SinkStatus nextStatus = null;
@@ -267,66 +263,35 @@ public class StreamSinkServiceImpl implements StreamSinkService {
             }
             streamProcessOperation.startProcess(groupId, streamId, operator, false);
         }
-        LOGGER.info("success to update sink info: {}", request);
+        LOGGER.info("success to update sink by id: {}", request);
         return true;
     }
 
     @Override
     @Transactional(rollbackFor = Throwable.class)
     public UpdateResult updateByKey(SinkRequest request, String operator) {
-        LOGGER.info("begin to update sink info: {}", request);
-        this.checkParams(request);
-        // Check if it can be modified
+        LOGGER.info("begin to update sink by key: {}", request);
+
+        // Check whether the sink name exists with the same groupId and streamId, and only one row
         String groupId = request.getInlongGroupId();
         String streamId = request.getInlongStreamId();
         String sinkName = request.getSinkName();
-        groupCheckService.checkGroupStatus(groupId, operator);
-
-        // Check whether the stream exist or not
-        InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, streamId);
-        Preconditions.checkNotNull(streamEntity, ErrorCodeEnum.STREAM_NOT_FOUND.getMessage());
-
-        // Check whether the sink name exists with the same groupId and streamId, and only one row
-        List<StreamSinkEntity> sinkList = sinkMapper.selectByRelatedId(groupId, streamId, sinkName);
-        if (CollectionUtils.isEmpty(sinkList)) {
-            String errMsg = String.format("can not find stream sink with group=%s, stream=%s, sinkName=%s",
+        StreamSinkEntity entity = sinkMapper.selectByUniqueKey(groupId, streamId, sinkName);
+        if (entity == null) {
+            String errMsg = String.format("stream sink not found with groupId=%s, streamId=%s, sinkName=%s",
                     groupId, streamId, sinkName);
             LOGGER.error(errMsg);
             throw new BusinessException(errMsg);
         }
 
-        if (sinkList.size() != 1) {
-            String errMsg = String.format("find %d stream sink with group=%s, stream=%s, sinkName=%s, "
-                    + "but only except 1", sinkList.size(), groupId, streamId, sinkName);
-            LOGGER.error(errMsg);
-            throw new BusinessException(errMsg);
-        }
-
-        StreamSinkEntity entity = sinkList.get(0);
         request.setId(entity.getId());
-        SinkStatus nextStatus = null;
-        boolean streamSuccess = StreamStatus.CONFIG_SUCCESSFUL.getCode().equals(streamEntity.getStatus());
-        if (streamSuccess || StreamStatus.CONFIG_FAILED.getCode().equals(streamEntity.getStatus())) {
-            nextStatus = SinkStatus.CONFIG_ING;
-        }
-        StreamSinkOperator sinkOperator = operatorFactory.getInstance(request.getSinkType());
-        sinkOperator.updateOpt(request, nextStatus, operator);
-
-        // If the stream is [CONFIG_SUCCESSFUL], then asynchronously start the [CREATE_STREAM_RESOURCE] process
-        if (streamSuccess) {
-            // To work around the circular reference check we manually instantiate and wire
-            if (streamProcessOperation == null) {
-                streamProcessOperation = new InlongStreamProcessService();
-                autowireCapableBeanFactory.autowireBean(streamProcessOperation);
-            }
-            streamProcessOperation.startProcess(groupId, streamId, operator, false);
-        }
-        LOGGER.info("success to update sink info: {}", request);
-        return new UpdateResult(entity.getId(), true, request.getVersion() + 1);
+        Boolean result = this.update(request, operator);
+        LOGGER.info("success to update sink by key: {}", request);
+        return new UpdateResult(entity.getId(), result, request.getVersion() + 1);
     }
 
     @Override
-    public void updateStatus(int id, int status, String log) {
+    public void updateStatus(Integer id, int status, String log) {
         StreamSinkEntity entity = new StreamSinkEntity();
         entity.setId(id);
         entity.setStatus(status);
@@ -336,49 +301,40 @@ public class StreamSinkServiceImpl implements StreamSinkService {
         LOGGER.info("success to update sink status={} for id={} with log: {}", status, id, log);
     }
 
-    @Transactional(rollbackFor = Throwable.class)
     @Override
+    @Transactional(rollbackFor = Throwable.class)
     public Boolean delete(Integer id, String operator) {
         LOGGER.info("begin to delete sink by id={}", id);
         Preconditions.checkNotNull(id, ErrorCodeEnum.ID_IS_EMPTY.getMessage());
         StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(id);
         Preconditions.checkNotNull(entity, ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
+
         groupCheckService.checkGroupStatus(entity.getInlongGroupId(), operator);
+
         StreamSinkOperator sinkOperator = operatorFactory.getInstance(entity.getSinkType());
         sinkOperator.deleteOpt(entity, operator);
-        LOGGER.info("success to delete sink info: {}", entity);
+        LOGGER.info("success to delete sink by id: {}", entity);
         return true;
     }
 
-    @Transactional(rollbackFor = Throwable.class)
     @Override
+    @Transactional(rollbackFor = Throwable.class)
     public Boolean deleteByKey(String groupId, String streamId, String sinkName, String operator) {
-        LOGGER.info("begin to delete sink by group id={}, stream id={}, name={}", groupId, streamId, sinkName);
+        LOGGER.info("begin to delete sink by groupId={}, streamId={}, sinkName={}", groupId, streamId, sinkName);
+
+        // Check whether the sink name exists with the same groupId and streamId
         Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
         Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
         Preconditions.checkNotNull(sinkName, "stream sink name is empty or null");
+        StreamSinkEntity entity = sinkMapper.selectByUniqueKey(groupId, streamId, sinkName);
+        Preconditions.checkNotNull(entity, String.format("stream sink not exist by groupId=%s streamId=%s sinkName=%s",
+                groupId, streamId, sinkName));
 
-        // Check whether the sink name exists with the same groupId and streamId, and only one row
-        List<StreamSinkEntity> sinkList = sinkMapper.selectByRelatedId(groupId, streamId, sinkName);
-        if (CollectionUtils.isEmpty(sinkList)) {
-            String errMsg = String.format("can not find stream sink with group=%s, stream=%s, sinkName=%s",
-                    groupId, streamId, sinkName);
-            LOGGER.error(errMsg);
-            throw new BusinessException(errMsg);
-        }
-
-        if (sinkList.size() != 1) {
-            String errMsg = String.format("find %d stream sink with group=%s, stream=%s, sinkName=%s, "
-                    + "but only except 1", sinkList.size(), groupId, streamId, sinkName);
-            LOGGER.error(errMsg);
-            throw new BusinessException(errMsg);
-        }
-
-        StreamSinkEntity entity = sinkList.get(0);
         groupCheckService.checkGroupStatus(entity.getInlongGroupId(), operator);
+
         StreamSinkOperator sinkOperator = operatorFactory.getInstance(entity.getSinkType());
         sinkOperator.deleteOpt(entity, operator);
-        LOGGER.info("success to delete sink info: {}", entity);
+        LOGGER.info("success to delete sink by key: {}", entity);
         return true;
     }
 
@@ -392,7 +348,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
         // Check if it can be deleted
         groupCheckService.checkGroupStatus(groupId, operator);
 
-        List<StreamSinkEntity> entityList = sinkMapper.selectByRelatedId(groupId, streamId, null);
+        List<StreamSinkEntity> entityList = sinkMapper.selectByRelatedId(groupId, streamId);
         if (CollectionUtils.isNotEmpty(entityList)) {
             entityList.forEach(entity -> {
                 Integer id = entity.getId();
@@ -425,7 +381,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
         // Check if it can be deleted
         groupCheckService.checkGroupStatus(groupId, operator);
 
-        List<StreamSinkEntity> entityList = sinkMapper.selectByRelatedId(groupId, streamId, null);
+        List<StreamSinkEntity> entityList = sinkMapper.selectByRelatedId(groupId, streamId);
         if (CollectionUtils.isNotEmpty(entityList)) {
             entityList.forEach(entity -> {
                 sinkMapper.deleteByPrimaryKey(entity.getId());
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HiveSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HiveSinkServiceTest.java
index 9b1421c83..f2249c00a 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HiveSinkServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HiveSinkServiceTest.java
@@ -19,11 +19,11 @@ package org.apache.inlong.manager.service.sink;
 
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.pojo.common.UpdateResult;
 import org.apache.inlong.manager.pojo.sink.StreamSink;
 import org.apache.inlong.manager.pojo.sink.hive.HiveSink;
 import org.apache.inlong.manager.pojo.sink.hive.HiveSinkRequest;
-import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.service.ServiceBaseTest;
 import org.apache.inlong.manager.service.core.impl.InlongStreamServiceTest;
 import org.junit.jupiter.api.Assertions;
@@ -72,7 +72,7 @@ public class HiveSinkServiceTest extends ServiceBaseTest {
     }
 
     @Test
-    public void testSaveAndDeleteByUniqueKey() {
+    public void testSaveAndDeleteByKey() {
         Integer id = this.saveSink();
         Assertions.assertNotNull(id);
 
@@ -106,10 +106,10 @@ public class HiveSinkServiceTest extends ServiceBaseTest {
     }
 
     @Test
-    public void testGetAndUpdateByUniqueKey() {
+    public void testGetAndUpdateByKey() {
         Integer sinkId = this.saveSink();
         StreamSink streamSink = sinkService.get(sinkId);
-         Assertions.assertEquals(globalGroupId, streamSink.getInlongGroupId());
+        Assertions.assertEquals(globalGroupId, streamSink.getInlongGroupId());
 
         HiveSink sink = (HiveSink) streamSink;
         sink.setEnableCreateResource(InlongConstants.DISABLE_CREATE_RESOURCE);
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
index 74cdf660c..121185692 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
@@ -91,8 +91,7 @@ public class StreamSinkController {
     @ApiOperation(value = "Delete stream sink")
     @ApiImplicitParam(name = "id", dataTypeClass = Integer.class, required = true)
     public Response<Boolean> delete(@PathVariable Integer id) {
-        boolean result = sinkService.delete(id, LoginUserUtils.getLoginUser().getName());
-        return Response.success(result);
+        return Response.success(sinkService.delete(id, LoginUserUtils.getLoginUser().getName()));
     }
 
     @RequestMapping(value = "/sink/deleteByKey", method = RequestMethod.DELETE)
@@ -103,12 +102,9 @@ public class StreamSinkController {
             @ApiImplicitParam(name = "streamId", dataTypeClass = String.class, required = true),
             @ApiImplicitParam(name = "name", dataTypeClass = String.class, required = true)
     })
-    public Response<Boolean> deleteByKey(
-            @RequestParam String groupId,
-            @RequestParam String streamId,
+    public Response<Boolean> deleteByKey(@RequestParam String groupId, @RequestParam String streamId,
             @RequestParam String name) {
-        boolean result = sinkService.deleteByKey(groupId, streamId, name,
-                LoginUserUtils.getLoginUser().getName());
+        boolean result = sinkService.deleteByKey(groupId, streamId, name, LoginUserUtils.getLoginUser().getName());
         return Response.success(result);
     }