You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/10/16 09:04:19 UTC
[inlong] branch master updated: [INLONG-6086][Manager] Support updating and deleting stream sink by key (#6087)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 0cd2e2657 [INLONG-6086][Manager] Support updating and deleting stream sink by key (#6087)
0cd2e2657 is described below
commit 0cd2e26572cd8f6dc2703aefc415bf9df490317f
Author: vernedeng <de...@pku.edu.cn>
AuthorDate: Sun Oct 16 17:04:13 2022 +0800
[INLONG-6086][Manager] Support updating and deleting stream sink by key (#6087)
---
.../manager/client/api/service/StreamSinkApi.java | 10 +++
.../manager/service/sink/StreamSinkService.java | 19 +++++
.../service/sink/StreamSinkServiceImpl.java | 87 ++++++++++++++++++++++
.../manager/service/sink/HiveSinkServiceTest.java | 26 +++++++
.../web/controller/StreamSinkController.java | 27 +++++++
5 files changed, 169 insertions(+)
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java
index 9324c85f1..505073dd6 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.client.api.service;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.pojo.common.UpdateResult;
import org.apache.inlong.manager.pojo.sink.SinkRequest;
import org.apache.inlong.manager.pojo.sink.StreamSink;
import retrofit2.Call;
@@ -37,9 +38,18 @@ public interface StreamSinkApi {
@POST("sink/update")
Call<Response<Boolean>> updateSink(@Body SinkRequest request);
+ @POST("sink/updateByKey")
+ Call<Response<UpdateResult>> updateSinkByKey(@Body SinkRequest request);
+
@DELETE("sink/delete/{id}")
Call<Response<Boolean>> deleteSink(@Path("id") Integer id);
+ @DELETE("sink/deleteByKey")
+ Call<Response<Boolean>> deleteSink(
+ @Query("groupId") String groupId,
+ @Query("streamId") String streamId,
+ @Query("name") String name);
+
@GET("sink/list")
Call<Response<PageResult<StreamSink>>> listSinks(@Query("inlongGroupId") String groupId,
@Query("inlongStreamId") String streamId, @Query("sinkType") String sinkType);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
index 6b09fd551..5ce554c85 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
@@ -18,6 +18,7 @@
package org.apache.inlong.manager.service.sink;
import org.apache.inlong.manager.pojo.common.PageResult;
+import org.apache.inlong.manager.pojo.common.UpdateResult;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.sink.SinkApproveDTO;
import org.apache.inlong.manager.pojo.sink.SinkBriefInfo;
@@ -104,6 +105,15 @@ public interface StreamSinkService {
*/
Boolean update(SinkRequest sinkRequest, String operator);
+ /**
+ * Modify data sink information by key.
+ *
+ * @param sinkRequest Information that needs to be modified.
+ * @param operator Operator's name.
+ * @return Update result.
+ */
+ UpdateResult updateByKey(SinkRequest sinkRequest, String operator);
+
/**
* Modify sink data status.
*
@@ -122,6 +132,15 @@ public interface StreamSinkService {
*/
Boolean delete(Integer id, String operator);
+ /**
+ * Delete the stream sink by given group id, stream id, and sink name.
+ * @param groupId The group id of sink
+ * @param streamId The stream id of sink
+ * @param name The name of sink
+ * @return Whether succeed
+ */
+ Boolean deleteByKey(String groupId, String streamId, String name, String operator);
+
/**
* Logically delete stream sink with the given conditions.
*
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index e2572651d..4094cf5c8 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -37,6 +37,7 @@ import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
import org.apache.inlong.manager.pojo.common.OrderFieldEnum;
import org.apache.inlong.manager.pojo.common.OrderTypeEnum;
import org.apache.inlong.manager.pojo.common.PageResult;
+import org.apache.inlong.manager.pojo.common.UpdateResult;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.sink.SinkApproveDTO;
import org.apache.inlong.manager.pojo.sink.SinkBriefInfo;
@@ -270,6 +271,60 @@ public class StreamSinkServiceImpl implements StreamSinkService {
return true;
}
+ @Override
+ @Transactional(rollbackFor = Throwable.class)
+ public UpdateResult updateByKey(SinkRequest request, String operator) {
+ LOGGER.info("begin to update sink info: {}", request);
+ this.checkParams(request);
+ // Check if it can be modified
+ String groupId = request.getInlongGroupId();
+ String streamId = request.getInlongStreamId();
+ String sinkName = request.getSinkName();
+ groupCheckService.checkGroupStatus(groupId, operator);
+
+ // Check whether the stream exist or not
+ InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, streamId);
+ Preconditions.checkNotNull(streamEntity, ErrorCodeEnum.STREAM_NOT_FOUND.getMessage());
+
+ // Check whether the sink name exists with the same groupId and streamId, and only one row
+ List<StreamSinkEntity> sinkList = sinkMapper.selectByRelatedId(groupId, streamId, sinkName);
+ if (CollectionUtils.isEmpty(sinkList)) {
+ String errMsg = String.format("can not find stream sink with group=%s, stream=%s, sinkName=%s",
+ groupId, streamId, sinkName);
+ LOGGER.error(errMsg);
+ throw new BusinessException(errMsg);
+ }
+
+ if (sinkList.size() != 1) {
+ String errMsg = String.format("find %d stream sink with group=%s, stream=%s, sinkName=%s, "
+ + "but only except 1", sinkList.size(), groupId, streamId, sinkName);
+ LOGGER.error(errMsg);
+ throw new BusinessException(errMsg);
+ }
+
+ StreamSinkEntity entity = sinkList.get(0);
+ request.setId(entity.getId());
+ SinkStatus nextStatus = null;
+ boolean streamSuccess = StreamStatus.CONFIG_SUCCESSFUL.getCode().equals(streamEntity.getStatus());
+ if (streamSuccess || StreamStatus.CONFIG_FAILED.getCode().equals(streamEntity.getStatus())) {
+ nextStatus = SinkStatus.CONFIG_ING;
+ }
+ StreamSinkOperator sinkOperator = operatorFactory.getInstance(request.getSinkType());
+ sinkOperator.updateOpt(request, nextStatus, operator);
+
+ // If the stream is [CONFIG_SUCCESSFUL], then asynchronously start the [CREATE_STREAM_RESOURCE] process
+ if (streamSuccess) {
+ // To work around the circular reference check we manually instantiate and wire
+ if (streamProcessOperation == null) {
+ streamProcessOperation = new InlongStreamProcessService();
+ autowireCapableBeanFactory.autowireBean(streamProcessOperation);
+ }
+ streamProcessOperation.startProcess(groupId, streamId, operator, false);
+ }
+ LOGGER.info("success to update sink info: {}", request);
+ return new UpdateResult(entity.getId(), true, request.getVersion() + 1);
+ }
+
@Override
public void updateStatus(int id, int status, String log) {
StreamSinkEntity entity = new StreamSinkEntity();
@@ -295,6 +350,38 @@ public class StreamSinkServiceImpl implements StreamSinkService {
return true;
}
+ @Transactional(rollbackFor = Throwable.class)
+ @Override
+ public Boolean deleteByKey(String groupId, String streamId, String sinkName, String operator) {
+ LOGGER.info("begin to delete sink by group id={}, stream id={}, name={}", groupId, streamId, sinkName);
+ Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
+ Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
+ Preconditions.checkNotNull(sinkName, "stream sink name is empty or null");
+
+ // Check whether the sink name exists with the same groupId and streamId, and only one row
+ List<StreamSinkEntity> sinkList = sinkMapper.selectByRelatedId(groupId, streamId, sinkName);
+ if (CollectionUtils.isEmpty(sinkList)) {
+ String errMsg = String.format("can not find stream sink with group=%s, stream=%s, sinkName=%s",
+ groupId, streamId, sinkName);
+ LOGGER.error(errMsg);
+ throw new BusinessException(errMsg);
+ }
+
+ if (sinkList.size() != 1) {
+ String errMsg = String.format("find %d stream sink with group=%s, stream=%s, sinkName=%s, "
+ + "but only except 1", sinkList.size(), groupId, streamId, sinkName);
+ LOGGER.error(errMsg);
+ throw new BusinessException(errMsg);
+ }
+
+ StreamSinkEntity entity = sinkList.get(0);
+ groupCheckService.checkGroupStatus(entity.getInlongGroupId(), operator);
+ StreamSinkOperator sinkOperator = operatorFactory.getInstance(entity.getSinkType());
+ sinkOperator.deleteOpt(entity, operator);
+ LOGGER.info("success to delete sink info: {}", entity);
+ return true;
+ }
+
@Override
@Transactional(rollbackFor = Throwable.class)
public Boolean logicDeleteAll(String groupId, String streamId, String operator) {
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HiveSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HiveSinkServiceTest.java
index 6197d4e81..9b1421c83 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HiveSinkServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HiveSinkServiceTest.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.service.sink;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.pojo.common.UpdateResult;
import org.apache.inlong.manager.pojo.sink.StreamSink;
import org.apache.inlong.manager.pojo.sink.hive.HiveSink;
import org.apache.inlong.manager.pojo.sink.hive.HiveSinkRequest;
@@ -70,6 +71,15 @@ public class HiveSinkServiceTest extends ServiceBaseTest {
Assertions.assertTrue(result);
}
+ @Test
+ public void testSaveAndDeleteByUniqueKey() {
+ Integer id = this.saveSink();
+ Assertions.assertNotNull(id);
+
+ boolean result = sinkService.deleteByKey(globalGroupId, globalStreamId, sinkName, globalOperator);
+ Assertions.assertTrue(result);
+ }
+
@Test
public void testListByIdentifier() {
Integer id = this.saveSink();
@@ -95,4 +105,20 @@ public class HiveSinkServiceTest extends ServiceBaseTest {
sinkService.delete(sinkId, globalOperator);
}
+ @Test
+ public void testGetAndUpdateByUniqueKey() {
+ Integer sinkId = this.saveSink();
+ StreamSink streamSink = sinkService.get(sinkId);
+ Assertions.assertEquals(globalGroupId, streamSink.getInlongGroupId());
+
+ HiveSink sink = (HiveSink) streamSink;
+ sink.setEnableCreateResource(InlongConstants.DISABLE_CREATE_RESOURCE);
+ HiveSinkRequest request = CommonBeanUtils.copyProperties(sink, HiveSinkRequest::new);
+ UpdateResult result = sinkService.updateByKey(request, globalOperator);
+ Assertions.assertTrue(result.getSuccess());
+ Assertions.assertEquals(request.getVersion() + 1, result.getVersion().intValue());
+
+ sinkService.delete(sinkId, globalOperator);
+ }
+
}
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
index b6a8b3de1..74cdf660c 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
@@ -19,11 +19,13 @@ package org.apache.inlong.manager.web.controller;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
+import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import org.apache.inlong.manager.common.enums.OperationType;
import org.apache.inlong.manager.common.validation.UpdateValidation;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.pojo.common.UpdateResult;
import org.apache.inlong.manager.pojo.sink.SinkPageRequest;
import org.apache.inlong.manager.pojo.sink.SinkRequest;
import org.apache.inlong.manager.pojo.sink.StreamSink;
@@ -36,6 +38,7 @@ import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
@@ -76,6 +79,13 @@ public class StreamSinkController {
return Response.success(sinkService.update(request, LoginUserUtils.getLoginUser().getName()));
}
+ @RequestMapping(value = "/sink/updateByKey", method = RequestMethod.POST)
+ @OperationLog(operation = OperationType.UPDATE)
+ @ApiOperation(value = "Update stream sink by key")
+ public Response<UpdateResult> updateByKey(@RequestBody SinkRequest request) {
+ return Response.success(sinkService.updateByKey(request, LoginUserUtils.getLoginUser().getName()));
+ }
+
@RequestMapping(value = "/sink/delete/{id}", method = RequestMethod.DELETE)
@OperationLog(operation = OperationType.DELETE)
@ApiOperation(value = "Delete stream sink")
@@ -85,4 +95,21 @@ public class StreamSinkController {
return Response.success(result);
}
+ @RequestMapping(value = "/sink/deleteByKey", method = RequestMethod.DELETE)
+ @OperationLog(operation = OperationType.DELETE)
+ @ApiOperation(value = "Delete stream sink by key")
+ @ApiImplicitParams({
+ @ApiImplicitParam(name = "groupId", dataTypeClass = String.class, required = true),
+ @ApiImplicitParam(name = "streamId", dataTypeClass = String.class, required = true),
+ @ApiImplicitParam(name = "name", dataTypeClass = String.class, required = true)
+ })
+ public Response<Boolean> deleteByKey(
+ @RequestParam String groupId,
+ @RequestParam String streamId,
+ @RequestParam String name) {
+ boolean result = sinkService.deleteByKey(groupId, streamId, name,
+ LoginUserUtils.getLoginUser().getName());
+ return Response.success(result);
+ }
+
}