You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/11/10 13:45:37 UTC
[inlong] 04/04: [INLONG-6487][Manager] Add API to force delete the stream source (#6489)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/inlong.git
commit d061d9a4e3d03790b6f3349f1dcd0f9dac71d303
Author: haifxu <xh...@gmail.com>
AuthorDate: Thu Nov 10 21:29:25 2022 +0800
[INLONG-6487][Manager] Add API to force delete the stream source (#6489)
---
.../inlong/manager/client/api/StreamSource.java | 30 +++++++++++++++
.../manager/client/api/impl/StreamSourceImpl.java | 43 ++++++++++++++++++++++
.../api/inner/client/StreamSourceClient.java | 17 +++++++++
.../client/api/service/StreamSourceApi.java | 4 ++
.../dao/mapper/StreamSourceEntityMapper.java | 3 ++
.../dao/mapper/StreamSourceFieldEntityMapper.java | 11 ++++--
.../resources/mappers/StreamSourceEntityMapper.xml | 17 ++++++++-
.../mappers/StreamSourceFieldEntityMapper.xml | 15 +++++---
.../service/source/StreamSourceService.java | 10 +++++
.../service/source/StreamSourceServiceImpl.java | 14 +++++++
.../web/controller/StreamSourceController.java | 15 +++++++-
11 files changed, 168 insertions(+), 11 deletions(-)
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSource.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSource.java
new file mode 100644
index 000000000..07927efd5
--- /dev/null
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSource.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api;
+
+public interface StreamSource {
+
+ /**
+ * Force deletes the stream source by groupId and streamId
+ *
+ * @param groupId The belongs group id.
+ * @param streamId The belongs stream id.
+ * @return Whether succeed
+ */
+ Boolean forceDelete(String groupId, String streamId);
+}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/StreamSourceImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/StreamSourceImpl.java
new file mode 100644
index 000000000..3ed1a0437
--- /dev/null
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/StreamSourceImpl.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api.impl;
+
+import org.apache.inlong.manager.client.api.ClientConfiguration;
+import org.apache.inlong.manager.client.api.StreamSource;
+import org.apache.inlong.manager.client.api.inner.client.ClientFactory;
+import org.apache.inlong.manager.client.api.inner.client.StreamSourceClient;
+import org.apache.inlong.manager.client.api.util.ClientUtils;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.util.Preconditions;
+
+public class StreamSourceImpl implements StreamSource {
+
+ private final StreamSourceClient sourceClient;
+
+ public StreamSourceImpl(ClientConfiguration configuration) {
+ ClientFactory clientFactory = ClientUtils.getClientFactory(configuration);
+ this.sourceClient = clientFactory.getSourceClient();
+ }
+
+ @Override
+ public Boolean forceDelete(String groupId, String streamId) {
+ Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
+ Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
+ return sourceClient.forceDelete(groupId, streamId);
+ }
+}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSourceClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSourceClient.java
index f1eb4a87d..f98635066 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSourceClient.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSourceClient.java
@@ -21,6 +21,7 @@ import org.apache.commons.lang3.tuple.Pair;
import org.apache.inlong.manager.client.api.ClientConfiguration;
import org.apache.inlong.manager.client.api.service.StreamSourceApi;
import org.apache.inlong.manager.client.api.util.ClientUtils;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.Response;
@@ -88,6 +89,22 @@ public class StreamSourceClient {
return response.getData();
}
+ /**
+ * Force deletes the stream source by groupId and streamId
+ *
+ * @param groupId The belongs group id.
+ * @param streamId The belongs stream id.
+ * @return Whether succeed
+ */
+ public boolean forceDelete(String groupId, String streamId) {
+ Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
+ Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
+
+ Response<Boolean> response = ClientUtils.executeHttpCall(streamSourceApi.forceDelete(groupId, streamId));
+ ClientUtils.assertRespSuccess(response);
+ return response.getData();
+ }
+
public StreamSource get(int id) {
Preconditions.checkTrue(id > 0, "sourceId is illegal");
Response<StreamSource> response = ClientUtils.executeHttpCall(streamSourceApi.get(id));
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSourceApi.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSourceApi.java
index 5c0fe8c15..33c933486 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSourceApi.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSourceApi.java
@@ -44,6 +44,10 @@ public interface StreamSourceApi {
@DELETE("source/delete/{id}")
Call<Response<Boolean>> deleteSource(@Path("id") Integer sourceId);
+ @DELETE("source/forceDelete")
+ Call<Response<Boolean>> forceDelete(@Query("inlongGroupId") String groupId,
+ @Query("inlongStreamId") String streamId);
+
@GET("source/get/{id}")
Call<Response<StreamSource>> get(@Path("id") Integer id);
}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
index 6e851e11c..aee466d88 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
@@ -105,6 +105,9 @@ public interface StreamSourceEntityMapper {
int updateByPrimaryKeySelective(StreamSourceEntity record);
+ int updateByRelatedId(@Param("groupId") String groupId, @Param("streamId") String streamId,
+ @Param("status") Integer status);
+
int updateByPrimaryKey(StreamSourceEntity record);
/**
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceFieldEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceFieldEntityMapper.java
index 0059e8362..0e67a9f77 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceFieldEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceFieldEntityMapper.java
@@ -26,14 +26,12 @@ import java.util.List;
@Repository
public interface StreamSourceFieldEntityMapper {
- int deleteByPrimaryKey(Integer id);
-
int insert(StreamSourceFieldEntity record);
int insertSelective(StreamSourceFieldEntity record);
/**
- * Selete undeleted source field by source id.
+ * Select undeleted source field by source id.
*
* @param sourceId source id
* @return stream source field list
@@ -44,6 +42,13 @@ public interface StreamSourceFieldEntityMapper {
int updateByPrimaryKey(StreamSourceFieldEntity record);
+ /**
+ * Logically delete all stream source fields based on inlong group id and inlong stream id
+ *
+ * @return rows deleted
+ */
+ int updateByRelatedId(@Param("groupId") String groupId, @Param("streamId") String streamId);
+
/**
* Insert all field list
*
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index 7b1d21071..e1e7f56a8 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -60,7 +60,7 @@
previous_status, creator, modifier)
values (#{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR},
#{sourceType,jdbcType=VARCHAR}, #{sourceName,jdbcType=VARCHAR}, #{templateId,jdbcType=INTEGER},
- #{agentIp,jdbcType=VARCHAR},#{uuid,jdbcType=VARCHAR},
+ #{agentIp,jdbcType=VARCHAR}, #{uuid,jdbcType=VARCHAR},
#{dataNodeName,jdbcType=VARCHAR}, #{inlongClusterName,jdbcType=VARCHAR},
#{serializationType,jdbcType=VARCHAR}, #{snapshot,jdbcType=LONGVARCHAR},
#{modifyTime,jdbcType=TIMESTAMP}, #{extParams,jdbcType=LONGVARCHAR}, #{status,jdbcType=INTEGER},
@@ -271,6 +271,19 @@
</where>
</select>
+ <update id="updateByRelatedId">
+ update stream_source
+ <set>
+ is_deleted = id,
+ previous_status = status,
+ status = #{status, jdbcType=INTEGER},
+ version = version + 1
+ </set>
+ where is_deleted = 0
+ and inlong_group_id = #{groupId, jdbcType=VARCHAR}
+ and inlong_stream_id = #{streamId, jdbcType=VARCHAR}
+ </update>
+
<update id="updateByPrimaryKeySelective" parameterType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
update stream_source
<set>
@@ -351,7 +364,7 @@
creator = #{creator,jdbcType=VARCHAR},
modifier = #{modifier,jdbcType=VARCHAR}
where id = #{id,jdbcType=INTEGER}
- and version = #{version,jdbcType=INTEGER}
+ and version = #{version,jdbcType=INTEGER}
</update>
<update id="updateStatus">
update stream_source
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceFieldEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceFieldEntityMapper.xml
index f4ba00366..43cbc1187 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceFieldEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceFieldEntityMapper.xml
@@ -48,11 +48,7 @@
where source_id = #{sourceId,jdbcType=INTEGER}
and is_deleted = 0
</select>
- <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
- delete
- from stream_source_field
- where id = #{id,jdbcType=INTEGER}
- </delete>
+
<insert id="insert" useGeneratedKeys="true" keyProperty="id"
parameterType="org.apache.inlong.manager.dao.entity.StreamSourceFieldEntity">
insert into stream_source_field (id, inlong_group_id, inlong_stream_id,
@@ -231,6 +227,15 @@
is_deleted = #{isDeleted,jdbcType=INTEGER}
where id = #{id,jdbcType=INTEGER}
</update>
+ <update id="updateByRelatedId">
+ update stream_source_field
+ <set>
+ is_deleted = id
+ </set>
+ where is_deleted = 0
+ and inlong_group_id = #{groupId, jdbcType=VARCHAR}
+ and inlong_stream_id = #{streamId, jdbcType=VARCHAR}
+ </update>
<insert id="insertAll" parameterType="org.apache.inlong.manager.dao.entity.StreamSourceFieldEntity">
insert into stream_source_field (id, inlong_group_id,
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
index f0940cecb..8037c2994 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
@@ -116,6 +116,16 @@ public interface StreamSourceService {
*/
Boolean delete(Integer id, String operator);
+ /**
+ * Force deletes the stream source by groupId and streamId
+ *
+ * @param groupId The belongs group id.
+ * @param streamId The belongs stream id.
+ * @param operator Operator's name
+ * @return Whether succeed
+ */
+ Boolean forceDelete(String groupId, String streamId, String operator);
+
/**
* Delete the stream source by the given id and source type.
*
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index 2f2214ce1..f3782a7cd 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -267,6 +267,20 @@ public class StreamSourceServiceImpl implements StreamSourceService {
return true;
}
+ @Override
+ public Boolean forceDelete(String groupId, String streamId, String operator) {
+ LOGGER.info("begin to force delete source for groupId={} and streamId={} by user={}",
+ groupId, streamId, operator);
+ Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
+ Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
+
+ int sourceCount = sourceMapper.updateByRelatedId(groupId, streamId, SourceStatus.SOURCE_DISABLE.getCode());
+ int fieldCount = sourceFieldMapper.updateByRelatedId(groupId, streamId);
+ LOGGER.info("success to force delete source for groupId={} and streamId={} by user={},"
+ + " update {} sources and {} fields", groupId, streamId, operator, sourceCount, fieldCount);
+ return true;
+ }
+
@Override
@Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRES_NEW,
isolation = Isolation.READ_COMMITTED)
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
index acfdbd1ca..766fd9779 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.web.controller;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
+import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import org.apache.inlong.manager.common.enums.OperationType;
import org.apache.inlong.manager.common.validation.UpdateValidation;
@@ -36,6 +37,7 @@ import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
@@ -85,4 +87,15 @@ public class StreamSourceController {
return Response.success(result);
}
-}
\ No newline at end of file
+ @RequestMapping(value = "/source/forceDelete", method = RequestMethod.DELETE)
+ @OperationLog(operation = OperationType.DELETE)
+ @ApiOperation(value = "Force delete stream source by groupId and streamId")
+ @ApiImplicitParams({
+ @ApiImplicitParam(name = "groupId", dataTypeClass = String.class, required = true),
+ @ApiImplicitParam(name = "streamId", dataTypeClass = String.class, required = true)
+ })
+ public Response<Boolean> forceDelete(@RequestParam String groupId, @RequestParam String streamId) {
+ return Response.success(sourceService.forceDelete(groupId, streamId, LoginUserUtils.getLoginUser().getName()));
+ }
+
+}