You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/10/17 08:55:23 UTC
[inlong] branch master updated: [INLONG-6192][Manager] Clean and reuse code for StreamSink (#6193)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new c9c120f6d [INLONG-6192][Manager] Clean and reuse code for StreamSink (#6193)
c9c120f6d is described below
commit c9c120f6dd68c1184d8f81ecb54a3547a0baffb1
Author: healchow <he...@gmail.com>
AuthorDate: Mon Oct 17 16:55:18 2022 +0800
[INLONG-6192][Manager] Clean and reuse code for StreamSink (#6193)
---
.../client/api/inner/client/StreamSinkClient.java | 10 +-
.../manager/client/api/service/StreamSinkApi.java | 20 ++--
.../manager/dao/mapper/StreamSinkEntityMapper.java | 19 +++-
.../resources/mappers/StreamSinkEntityMapper.xml | 12 +-
.../service/core/impl/AuditServiceImpl.java | 2 +-
.../manager/service/sink/StreamSinkService.java | 124 +++++++++++----------
.../service/sink/StreamSinkServiceImpl.java | 122 +++++++-------------
.../manager/service/sink/HiveSinkServiceTest.java | 8 +-
.../web/controller/StreamSinkController.java | 10 +-
9 files changed, 148 insertions(+), 179 deletions(-)
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSinkClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSinkClient.java
index c902c2e96..3b6764eb5 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSinkClient.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSinkClient.java
@@ -41,7 +41,7 @@ public class StreamSinkClient {
}
public Integer createSink(SinkRequest sinkRequest) {
- Response<Integer> response = ClientUtils.executeHttpCall(streamSinkApi.createSink(sinkRequest));
+ Response<Integer> response = ClientUtils.executeHttpCall(streamSinkApi.save(sinkRequest));
ClientUtils.assertRespSuccess(response);
return response.getData();
}
@@ -51,7 +51,7 @@ public class StreamSinkClient {
*/
public boolean deleteSink(int id) {
Preconditions.checkTrue(id > 0, "sinkId is illegal");
- Response<Boolean> response = ClientUtils.executeHttpCall(streamSinkApi.deleteSink(id));
+ Response<Boolean> response = ClientUtils.executeHttpCall(streamSinkApi.deleteById(id));
ClientUtils.assertRespSuccess(response);
return response.getData();
}
@@ -68,7 +68,7 @@ public class StreamSinkClient {
*/
public List<StreamSink> listSinks(String groupId, String streamId, String sinkType) {
Response<PageResult<StreamSink>> response = ClientUtils.executeHttpCall(
- streamSinkApi.listSinks(groupId, streamId, sinkType));
+ streamSinkApi.list(groupId, streamId, sinkType));
ClientUtils.assertRespSuccess(response);
return response.getData().getList();
}
@@ -77,7 +77,7 @@ public class StreamSinkClient {
* Update the stream sink info.
*/
public Pair<Boolean, String> updateSink(SinkRequest sinkRequest) {
- Response<Boolean> responseBody = ClientUtils.executeHttpCall(streamSinkApi.updateSink(sinkRequest));
+ Response<Boolean> responseBody = ClientUtils.executeHttpCall(streamSinkApi.updateById(sinkRequest));
ClientUtils.assertRespSuccess(responseBody);
if (responseBody.getData() != null) {
@@ -91,7 +91,7 @@ public class StreamSinkClient {
* Get detail information of data sink.
*/
public StreamSink getSinkInfo(Integer sinkId) {
- Response<StreamSink> response = ClientUtils.executeHttpCall(streamSinkApi.getSinkInfo(sinkId));
+ Response<StreamSink> response = ClientUtils.executeHttpCall(streamSinkApi.get(sinkId));
ClientUtils.assertRespSuccess(response);
return response.getData();
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java
index 505073dd6..323cb75f1 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java
@@ -33,28 +33,26 @@ import retrofit2.http.Query;
public interface StreamSinkApi {
@POST("sink/save")
- Call<Response<Integer>> createSink(@Body SinkRequest request);
+ Call<Response<Integer>> save(@Body SinkRequest request);
@POST("sink/update")
- Call<Response<Boolean>> updateSink(@Body SinkRequest request);
+ Call<Response<Boolean>> updateById(@Body SinkRequest request);
@POST("sink/updateByKey")
- Call<Response<UpdateResult>> updateSinkByKey(@Body SinkRequest request);
+ Call<Response<UpdateResult>> updateByKey(@Body SinkRequest request);
@DELETE("sink/delete/{id}")
- Call<Response<Boolean>> deleteSink(@Path("id") Integer id);
+ Call<Response<Boolean>> deleteById(@Path("id") Integer id);
@DELETE("sink/deleteByKey")
- Call<Response<Boolean>> deleteSink(
- @Query("groupId") String groupId,
- @Query("streamId") String streamId,
+ Call<Response<Boolean>> deleteByKey(@Query("groupId") String groupId, @Query("streamId") String streamId,
@Query("name") String name);
+ @GET("sink/get/{id}")
+ Call<Response<StreamSink>> get(@Path("id") Integer sinkId);
+
@GET("sink/list")
- Call<Response<PageResult<StreamSink>>> listSinks(@Query("inlongGroupId") String groupId,
+ Call<Response<PageResult<StreamSink>>> list(@Query("inlongGroupId") String groupId,
@Query("inlongStreamId") String streamId, @Query("sinkType") String sinkType);
- @GET("sink/get/{id}")
- Call<Response<StreamSink>> getSinkInfo(@Path("id") Integer sinkId);
-
}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java
index 9999371ff..2662e9c0a 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java
@@ -64,12 +64,21 @@ public interface StreamSinkEntityMapper {
/**
* Query valid sink list by the given group id and stream id.
*
- * @param groupId Inlong group id.
- * @param streamId Inlong stream id.
- * @param sinkName Stream sink name.
- * @return Sink entity list.
+ * @param groupId inlong group id
+ * @param streamId inlong stream id
+ * @return stream sink entity list
+ */
+ List<StreamSinkEntity> selectByRelatedId(@Param("groupId") String groupId, @Param("streamId") String streamId);
+
+ /**
+ * Query stream sink by the unique key.
+ *
+ * @param groupId inlong group id
+ * @param streamId inlong stream id
+ * @param sinkName stream sink name
+ * @return stream sink entity
*/
- List<StreamSinkEntity> selectByRelatedId(@Param("groupId") String groupId, @Param("streamId") String streamId,
+ StreamSinkEntity selectByUniqueKey(@Param("groupId") String groupId, @Param("streamId") String streamId,
@Param("sinkName") String sinkName);
/**
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
index 5d1d3ea25..12e3bc6c0 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
@@ -267,11 +267,17 @@
<if test="streamId != null and streamId != ''">
and inlong_stream_id = #{streamId, jdbcType=VARCHAR}
</if>
- <if test="sinkName != null and sinkName != ''">
- and sink_name = #{sinkName, jdbcType=VARCHAR}
- </if>
</where>
</select>
+ <select id="selectByUniqueKey" resultType="org.apache.inlong.manager.dao.entity.StreamSinkEntity">
+ select
+ <include refid="Base_Column_List"/>
+ from stream_sink
+ where is_deleted = 0
+ and inlong_group_id = #{groupId, jdbcType=VARCHAR}
+ and inlong_stream_id = #{streamId, jdbcType=VARCHAR}
+ and sink_name = #{sinkName, jdbcType=VARCHAR}
+ </select>
<select id="selectByIdAndType" resultType="org.apache.inlong.manager.dao.entity.StreamSinkEntity">
select
<include refid="Base_Column_List"/>
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
index 3a5fe1480..c5061ec97 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
@@ -114,7 +114,7 @@ public class AuditServiceImpl implements AuditService {
// for now, we use the first sink type only.
// this is temporary behavior before multiple sinks in one stream is fully supported.
- List<StreamSinkEntity> sinkEntityList = sinkEntityMapper.selectByRelatedId(groupId, streamId, null);
+ List<StreamSinkEntity> sinkEntityList = sinkEntityMapper.selectByRelatedId(groupId, streamId);
String sinkNodeType = null;
if (CollectionUtils.isNotEmpty(sinkEntityList)) {
sinkNodeType = sinkEntityList.get(0).getSinkType();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
index 5ce554c85..f5c1894dd 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
@@ -27,6 +27,7 @@ import org.apache.inlong.manager.pojo.sink.SinkRequest;
import org.apache.inlong.manager.pojo.sink.StreamSink;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
@@ -36,37 +37,39 @@ import java.util.Map;
public interface StreamSinkService {
/**
- * Save the sink information.
+ * Save the sink info.
*
- * @param request Sink request.
- * @param operator Operator's name.
- * @return Sink id after saving.
+ * @param request sink request need to save
+ * @param operator name of operator
+ * @return sink id after saving
*/
Integer save(SinkRequest request, String operator);
/**
- * Query sink information based on id.
+ * Get stream sink info based on id.
*
- * @param id Sink id.
- * @return Sink info.
+ * @param id sink id
+ * @return detail of stream sink info
*/
StreamSink get(Integer id);
/**
- * Query sink information based on inlong group id and inlong stream id.
+ * List the stream sinks based on inlong group id and inlong stream id.
*
- * @param groupId Inlong group id.
- * @param streamId Inlong stream id, can be null.
- * @return Sink info list.
+ * @param groupId inlong group id
+ * @param streamId inlong stream id, can be null
+ * @return sink info list
*/
- List<StreamSink> listSink(String groupId, String streamId);
+ List<StreamSink> listSink(String groupId, @Nullable String streamId);
/**
- * Query sink summary based on inlong group id and inlong stream id, including sink cluster.
+ * Query sink brief info based on inlong group id and inlong stream id.
+ * <p/>
+ * The result will include sink cluster info.
*
- * @param groupId Inlong group id.
- * @param streamId Inlong stream id.
- * @return Sink info list.
+ * @param groupId inlong group id
+ * @param streamId inlong stream id
+ * @return stream sink brief info list
*/
List<SinkBriefInfo> listBrief(String groupId, String streamId);
@@ -80,84 +83,85 @@ public interface StreamSinkService {
Map<String, List<StreamSink>> getSinksMap(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfos);
/**
- * Query the number of undeleted sink info based on inlong group and inlong stream id
+ * Query the number of undeleted sink info based on inlong group and inlong stream id.
*
- * @param groupId Inlong group id.
- * @param streamId Inlong stream id.
- * @return Number of sink info.
+ * @param groupId inlong group id
+ * @param streamId inlong stream id
+ * @return count of sink info
*/
Integer getCount(String groupId, String streamId);
/**
- * Paging query sink information based on conditions.
+ * Paging query stream sink info based on conditions.
*
- * @param request paging request.
- * @return sink list
+ * @param request paging request
+ * @return sink page list
*/
PageResult<? extends StreamSink> listByCondition(SinkPageRequest request);
/**
- * Modify data sink information.
+ * Modify stream sink info by id.
*
- * @param sinkRequest Information that needs to be modified.
- * @param operator Operator's name.
- * @return Whether succeed.
+ * @param sinkRequest stream sink request that needs to be modified
+ * @param operator name of operator
+ * @return whether succeed
*/
Boolean update(SinkRequest sinkRequest, String operator);
/**
- * Modify data sink information by key.
+ * Modify stream sink info by key.
*
- * @param sinkRequest Information that needs to be modified.
- * @param operator Operator's name.
- * @return Update result.
+ * @param sinkRequest stream sink request that needs to be modified
+ * @param operator name of operator
+ * @return update result
*/
UpdateResult updateByKey(SinkRequest sinkRequest, String operator);
/**
- * Modify sink data status.
+ * Modify stream sink status.
*
- * @param id Sink id.
- * @param status Target status.
- * @param log Modify the log.
+ * @param id stream sink id
+ * @param status target status
+ * @param log log info of this modification
*/
- void updateStatus(int id, int status, String log);
+ void updateStatus(Integer id, int status, String log);
/**
* Delete the stream sink by the given id and sink type.
*
- * @param id The primary key of the sink.
- * @param operator Operator's name.
- * @return Whether succeed
+ * @param id stream sink id
+ * @param operator name of operator
+ * @return whether succeed
*/
Boolean delete(Integer id, String operator);
/**
* Delete the stream sink by given group id, stream id, and sink name.
- * @param groupId The group id of sink
- * @param streamId The stream id of sink
- * @param name The name of sink
- * @return Whether succeed
+ *
+ * @param groupId inlong group id
+ * @param streamId inlong stream id
+ * @param name stream sink name
+ * @return whether succeed
*/
Boolean deleteByKey(String groupId, String streamId, String name, String operator);
/**
* Logically delete stream sink with the given conditions.
*
- * @param groupId InLong group id to which the data source belongs.
- * @param streamId InLong stream id to which the data source belongs.
- * @param operator Operator's name.
- * @return Whether succeed.
+ * @param groupId inlong group id
+ * @param streamId inlong stream id
+ * @param operator name of operator
+ * @return whether succeed
*/
Boolean logicDeleteAll(String groupId, String streamId, String operator);
/**
* Physically delete stream sink with the given conditions.
*
- * @param groupId InLong group id.
- * @param streamId InLong stream id.
- * @param operator Operator's name.
- * @return Whether succeed.
+ * @param groupId inlong group id
+ * @param streamId inlong stream id
+ * @param operator name of operator
+ * @return whether succeed
*/
Boolean deleteAll(String groupId, String streamId, String operator);
@@ -165,27 +169,27 @@ public interface StreamSinkService {
* According to the existing inlong stream ID list, filter out the inlong stream id list
* containing the specified sink type.
*
- * @param groupId Inlong group id.
- * @param sinkType Sink type.
- * @param streamIdList Inlong stream id list.
- * @return List of filtered inlong stream ids.
+ * @param groupId inlong group id
+ * @param sinkType stream sink type
+ * @param streamIdList inlong stream id list
+ * @return list of filtered inlong stream ids
*/
List<String> getExistsStreamIdList(String groupId, String sinkType, List<String> streamIdList);
/**
* According to the inlong stream id, query the list of sink types owned by it
*
- * @param groupId Inlong group id
- * @param streamId Inlong stream id
- * @return List of sink types
+ * @param groupId inlong group id
+ * @param streamId inlong stream id
+ * @return list of sink types
*/
List<String> getSinkTypeList(String groupId, String streamId);
/**
* Save the information modified when the approval is passed
*
- * @param sinkApproveList Stream sink approval information
- * @param operator Operator's name
+ * @param sinkApproveList stream sink approval information
+ * @param operator name of operator
* @return whether succeed
*/
Boolean updateAfterApprove(List<SinkApproveDTO> sinkApproveList, String operator);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index 4094cf5c8..852484869 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -60,7 +60,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.stream.Collectors;
/**
@@ -97,18 +96,18 @@ public class StreamSinkServiceImpl implements StreamSinkService {
String groupId = request.getInlongGroupId();
groupCheckService.checkGroupStatus(groupId, operator);
- // Make sure that there is no sink info with the current groupId and streamId
+ // Make sure that there is no same sink name under the current groupId and streamId
String streamId = request.getInlongStreamId();
String sinkName = request.getSinkName();
// Check whether the stream exist or not
InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, streamId);
Preconditions.checkNotNull(streamEntity, ErrorCodeEnum.STREAM_NOT_FOUND.getMessage());
- List<StreamSinkEntity> sinkList = sinkMapper.selectByRelatedId(groupId, streamId, sinkName);
- for (StreamSinkEntity sinkEntity : sinkList) {
- if (sinkEntity != null && Objects.equals(sinkEntity.getSinkName(), sinkName)) {
- String err = "sink name=%s already exists with the groupId=%s streamId=%s";
- throw new BusinessException(String.format(err, sinkName, groupId, streamId));
- }
+
+ // Check whether the sink name exists with the same groupId and streamId
+ StreamSinkEntity exists = sinkMapper.selectByUniqueKey(groupId, streamId, sinkName);
+ if (exists != null && exists.getSinkName().equals(sinkName)) {
+ String err = "sink name=%s already exists with the groupId=%s streamId=%s";
+ throw new BusinessException(String.format(err, sinkName, groupId, streamId));
}
// According to the sink type, save sink information
@@ -163,7 +162,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
@Override
public List<StreamSink> listSink(String groupId, String streamId) {
Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
- List<StreamSinkEntity> entityList = sinkMapper.selectByRelatedId(groupId, streamId, null);
+ List<StreamSinkEntity> entityList = sinkMapper.selectByRelatedId(groupId, streamId);
if (CollectionUtils.isEmpty(entityList)) {
return Collections.emptyList();
}
@@ -226,7 +225,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
@Override
@Transactional(rollbackFor = Throwable.class)
public Boolean update(SinkRequest request, String operator) {
- LOGGER.info("begin to update sink info: {}", request);
+ LOGGER.info("begin to update sink by id: {}", request);
this.checkParams(request);
Preconditions.checkNotNull(request.getId(), ErrorCodeEnum.ID_IS_EMPTY.getMessage());
@@ -241,13 +240,10 @@ public class StreamSinkServiceImpl implements StreamSinkService {
Preconditions.checkNotNull(streamEntity, ErrorCodeEnum.STREAM_NOT_FOUND.getMessage());
// Check whether the sink name exists with the same groupId and streamId
- List<StreamSinkEntity> sinkList = sinkMapper.selectByRelatedId(groupId, streamId, sinkName);
- for (StreamSinkEntity entity : sinkList) {
- Integer sinkId = entity.getId();
- if (!Objects.equals(request.getId(), sinkId) && Objects.equals(entity.getSinkName(), sinkName)) {
- String err = "sink name=%s already exists with the groupId=%s streamId=%s";
- throw new BusinessException(String.format(err, sinkName, groupId, streamId));
- }
+ StreamSinkEntity exists = sinkMapper.selectByUniqueKey(groupId, streamId, sinkName);
+ if (exists != null && !exists.getId().equals(request.getId()) && exists.getSinkName().equals(sinkName)) {
+ String errMsg = "sink name=%s already exists with the groupId=%s streamId=%s";
+ throw new BusinessException(String.format(errMsg, sinkName, groupId, streamId));
}
SinkStatus nextStatus = null;
@@ -267,66 +263,35 @@ public class StreamSinkServiceImpl implements StreamSinkService {
}
streamProcessOperation.startProcess(groupId, streamId, operator, false);
}
- LOGGER.info("success to update sink info: {}", request);
+ LOGGER.info("success to update sink by id: {}", request);
return true;
}
@Override
@Transactional(rollbackFor = Throwable.class)
public UpdateResult updateByKey(SinkRequest request, String operator) {
- LOGGER.info("begin to update sink info: {}", request);
- this.checkParams(request);
- // Check if it can be modified
+ LOGGER.info("begin to update sink by key: {}", request);
+
+ // Check whether the sink name exists with the same groupId and streamId, and only one row
String groupId = request.getInlongGroupId();
String streamId = request.getInlongStreamId();
String sinkName = request.getSinkName();
- groupCheckService.checkGroupStatus(groupId, operator);
-
- // Check whether the stream exist or not
- InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, streamId);
- Preconditions.checkNotNull(streamEntity, ErrorCodeEnum.STREAM_NOT_FOUND.getMessage());
-
- // Check whether the sink name exists with the same groupId and streamId, and only one row
- List<StreamSinkEntity> sinkList = sinkMapper.selectByRelatedId(groupId, streamId, sinkName);
- if (CollectionUtils.isEmpty(sinkList)) {
- String errMsg = String.format("can not find stream sink with group=%s, stream=%s, sinkName=%s",
+ StreamSinkEntity entity = sinkMapper.selectByUniqueKey(groupId, streamId, sinkName);
+ if (entity == null) {
+ String errMsg = String.format("stream sink not found with groupId=%s, streamId=%s, sinkName=%s",
groupId, streamId, sinkName);
LOGGER.error(errMsg);
throw new BusinessException(errMsg);
}
- if (sinkList.size() != 1) {
- String errMsg = String.format("find %d stream sink with group=%s, stream=%s, sinkName=%s, "
- + "but only except 1", sinkList.size(), groupId, streamId, sinkName);
- LOGGER.error(errMsg);
- throw new BusinessException(errMsg);
- }
-
- StreamSinkEntity entity = sinkList.get(0);
request.setId(entity.getId());
- SinkStatus nextStatus = null;
- boolean streamSuccess = StreamStatus.CONFIG_SUCCESSFUL.getCode().equals(streamEntity.getStatus());
- if (streamSuccess || StreamStatus.CONFIG_FAILED.getCode().equals(streamEntity.getStatus())) {
- nextStatus = SinkStatus.CONFIG_ING;
- }
- StreamSinkOperator sinkOperator = operatorFactory.getInstance(request.getSinkType());
- sinkOperator.updateOpt(request, nextStatus, operator);
-
- // If the stream is [CONFIG_SUCCESSFUL], then asynchronously start the [CREATE_STREAM_RESOURCE] process
- if (streamSuccess) {
- // To work around the circular reference check we manually instantiate and wire
- if (streamProcessOperation == null) {
- streamProcessOperation = new InlongStreamProcessService();
- autowireCapableBeanFactory.autowireBean(streamProcessOperation);
- }
- streamProcessOperation.startProcess(groupId, streamId, operator, false);
- }
- LOGGER.info("success to update sink info: {}", request);
- return new UpdateResult(entity.getId(), true, request.getVersion() + 1);
+ Boolean result = this.update(request, operator);
+ LOGGER.info("success to update sink by key: {}", request);
+ return new UpdateResult(entity.getId(), result, request.getVersion() + 1);
}
@Override
- public void updateStatus(int id, int status, String log) {
+ public void updateStatus(Integer id, int status, String log) {
StreamSinkEntity entity = new StreamSinkEntity();
entity.setId(id);
entity.setStatus(status);
@@ -336,49 +301,40 @@ public class StreamSinkServiceImpl implements StreamSinkService {
LOGGER.info("success to update sink status={} for id={} with log: {}", status, id, log);
}
- @Transactional(rollbackFor = Throwable.class)
@Override
+ @Transactional(rollbackFor = Throwable.class)
public Boolean delete(Integer id, String operator) {
LOGGER.info("begin to delete sink by id={}", id);
Preconditions.checkNotNull(id, ErrorCodeEnum.ID_IS_EMPTY.getMessage());
StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(id);
Preconditions.checkNotNull(entity, ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
+
groupCheckService.checkGroupStatus(entity.getInlongGroupId(), operator);
+
StreamSinkOperator sinkOperator = operatorFactory.getInstance(entity.getSinkType());
sinkOperator.deleteOpt(entity, operator);
- LOGGER.info("success to delete sink info: {}", entity);
+ LOGGER.info("success to delete sink by id: {}", entity);
return true;
}
- @Transactional(rollbackFor = Throwable.class)
@Override
+ @Transactional(rollbackFor = Throwable.class)
public Boolean deleteByKey(String groupId, String streamId, String sinkName, String operator) {
- LOGGER.info("begin to delete sink by group id={}, stream id={}, name={}", groupId, streamId, sinkName);
+ LOGGER.info("begin to delete sink by groupId={}, streamId={}, sinkName={}", groupId, streamId, sinkName);
+
+ // Check whether the sink name exists with the same groupId and streamId
Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
Preconditions.checkNotNull(sinkName, "stream sink name is empty or null");
+ StreamSinkEntity entity = sinkMapper.selectByUniqueKey(groupId, streamId, sinkName);
+ Preconditions.checkNotNull(entity, String.format("stream sink not exist by groupId=%s streamId=%s sinkName=%s",
+ groupId, streamId, sinkName));
- // Check whether the sink name exists with the same groupId and streamId, and only one row
- List<StreamSinkEntity> sinkList = sinkMapper.selectByRelatedId(groupId, streamId, sinkName);
- if (CollectionUtils.isEmpty(sinkList)) {
- String errMsg = String.format("can not find stream sink with group=%s, stream=%s, sinkName=%s",
- groupId, streamId, sinkName);
- LOGGER.error(errMsg);
- throw new BusinessException(errMsg);
- }
-
- if (sinkList.size() != 1) {
- String errMsg = String.format("find %d stream sink with group=%s, stream=%s, sinkName=%s, "
- + "but only except 1", sinkList.size(), groupId, streamId, sinkName);
- LOGGER.error(errMsg);
- throw new BusinessException(errMsg);
- }
-
- StreamSinkEntity entity = sinkList.get(0);
groupCheckService.checkGroupStatus(entity.getInlongGroupId(), operator);
+
StreamSinkOperator sinkOperator = operatorFactory.getInstance(entity.getSinkType());
sinkOperator.deleteOpt(entity, operator);
- LOGGER.info("success to delete sink info: {}", entity);
+ LOGGER.info("success to delete sink by key: {}", entity);
return true;
}
@@ -392,7 +348,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
// Check if it can be deleted
groupCheckService.checkGroupStatus(groupId, operator);
- List<StreamSinkEntity> entityList = sinkMapper.selectByRelatedId(groupId, streamId, null);
+ List<StreamSinkEntity> entityList = sinkMapper.selectByRelatedId(groupId, streamId);
if (CollectionUtils.isNotEmpty(entityList)) {
entityList.forEach(entity -> {
Integer id = entity.getId();
@@ -425,7 +381,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
// Check if it can be deleted
groupCheckService.checkGroupStatus(groupId, operator);
- List<StreamSinkEntity> entityList = sinkMapper.selectByRelatedId(groupId, streamId, null);
+ List<StreamSinkEntity> entityList = sinkMapper.selectByRelatedId(groupId, streamId);
if (CollectionUtils.isNotEmpty(entityList)) {
entityList.forEach(entity -> {
sinkMapper.deleteByPrimaryKey(entity.getId());
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HiveSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HiveSinkServiceTest.java
index 9b1421c83..f2249c00a 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HiveSinkServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HiveSinkServiceTest.java
@@ -19,11 +19,11 @@ package org.apache.inlong.manager.service.sink;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.pojo.common.UpdateResult;
import org.apache.inlong.manager.pojo.sink.StreamSink;
import org.apache.inlong.manager.pojo.sink.hive.HiveSink;
import org.apache.inlong.manager.pojo.sink.hive.HiveSinkRequest;
-import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.service.ServiceBaseTest;
import org.apache.inlong.manager.service.core.impl.InlongStreamServiceTest;
import org.junit.jupiter.api.Assertions;
@@ -72,7 +72,7 @@ public class HiveSinkServiceTest extends ServiceBaseTest {
}
@Test
- public void testSaveAndDeleteByUniqueKey() {
+ public void testSaveAndDeleteByKey() {
Integer id = this.saveSink();
Assertions.assertNotNull(id);
@@ -106,10 +106,10 @@ public class HiveSinkServiceTest extends ServiceBaseTest {
}
@Test
- public void testGetAndUpdateByUniqueKey() {
+ public void testGetAndUpdateByKey() {
Integer sinkId = this.saveSink();
StreamSink streamSink = sinkService.get(sinkId);
- Assertions.assertEquals(globalGroupId, streamSink.getInlongGroupId());
+ Assertions.assertEquals(globalGroupId, streamSink.getInlongGroupId());
HiveSink sink = (HiveSink) streamSink;
sink.setEnableCreateResource(InlongConstants.DISABLE_CREATE_RESOURCE);
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
index 74cdf660c..121185692 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
@@ -91,8 +91,7 @@ public class StreamSinkController {
@ApiOperation(value = "Delete stream sink")
@ApiImplicitParam(name = "id", dataTypeClass = Integer.class, required = true)
public Response<Boolean> delete(@PathVariable Integer id) {
- boolean result = sinkService.delete(id, LoginUserUtils.getLoginUser().getName());
- return Response.success(result);
+ return Response.success(sinkService.delete(id, LoginUserUtils.getLoginUser().getName()));
}
@RequestMapping(value = "/sink/deleteByKey", method = RequestMethod.DELETE)
@@ -103,12 +102,9 @@ public class StreamSinkController {
@ApiImplicitParam(name = "streamId", dataTypeClass = String.class, required = true),
@ApiImplicitParam(name = "name", dataTypeClass = String.class, required = true)
})
- public Response<Boolean> deleteByKey(
- @RequestParam String groupId,
- @RequestParam String streamId,
+ public Response<Boolean> deleteByKey(@RequestParam String groupId, @RequestParam String streamId,
@RequestParam String name) {
- boolean result = sinkService.deleteByKey(groupId, streamId, name,
- LoginUserUtils.getLoginUser().getName());
+ boolean result = sinkService.deleteByKey(groupId, streamId, name, LoginUserUtils.getLoginUser().getName());
return Response.success(result);
}