You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/11/10 13:45:37 UTC

[inlong] 04/04: [INLONG-6487][Manager] Add API to force delete the stream source (#6489)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit d061d9a4e3d03790b6f3349f1dcd0f9dac71d303
Author: haifxu <xh...@gmail.com>
AuthorDate: Thu Nov 10 21:29:25 2022 +0800

    [INLONG-6487][Manager] Add API to force delete the stream source (#6489)
---
 .../inlong/manager/client/api/StreamSource.java    | 30 +++++++++++++++
 .../manager/client/api/impl/StreamSourceImpl.java  | 43 ++++++++++++++++++++++
 .../api/inner/client/StreamSourceClient.java       | 17 +++++++++
 .../client/api/service/StreamSourceApi.java        |  4 ++
 .../dao/mapper/StreamSourceEntityMapper.java       |  3 ++
 .../dao/mapper/StreamSourceFieldEntityMapper.java  | 11 ++++--
 .../resources/mappers/StreamSourceEntityMapper.xml | 17 ++++++++-
 .../mappers/StreamSourceFieldEntityMapper.xml      | 15 +++++---
 .../service/source/StreamSourceService.java        | 10 +++++
 .../service/source/StreamSourceServiceImpl.java    | 14 +++++++
 .../web/controller/StreamSourceController.java     | 15 +++++++-
 11 files changed, 168 insertions(+), 11 deletions(-)

diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSource.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSource.java
new file mode 100644
index 000000000..07927efd5
--- /dev/null
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSource.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api;
+
+public interface StreamSource {
+
+    /**
+     * Force deletes the stream source by groupId and streamId
+     *
+     * @param groupId The belongs group id.
+     * @param streamId The belongs stream id.
+     * @return Whether succeed
+     */
+    Boolean forceDelete(String groupId, String streamId);
+}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/StreamSourceImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/StreamSourceImpl.java
new file mode 100644
index 000000000..3ed1a0437
--- /dev/null
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/StreamSourceImpl.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api.impl;
+
+import org.apache.inlong.manager.client.api.ClientConfiguration;
+import org.apache.inlong.manager.client.api.StreamSource;
+import org.apache.inlong.manager.client.api.inner.client.ClientFactory;
+import org.apache.inlong.manager.client.api.inner.client.StreamSourceClient;
+import org.apache.inlong.manager.client.api.util.ClientUtils;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.util.Preconditions;
+
+public class StreamSourceImpl implements StreamSource {
+
+    private final StreamSourceClient sourceClient;
+
+    public StreamSourceImpl(ClientConfiguration configuration) {
+        ClientFactory clientFactory = ClientUtils.getClientFactory(configuration);
+        this.sourceClient = clientFactory.getSourceClient();
+    }
+
+    @Override
+    public Boolean forceDelete(String groupId, String streamId) {
+        Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
+        Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
+        return sourceClient.forceDelete(groupId, streamId);
+    }
+}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSourceClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSourceClient.java
index f1eb4a87d..f98635066 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSourceClient.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSourceClient.java
@@ -21,6 +21,7 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.apache.inlong.manager.client.api.ClientConfiguration;
 import org.apache.inlong.manager.client.api.service.StreamSourceApi;
 import org.apache.inlong.manager.client.api.util.ClientUtils;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.pojo.common.PageResult;
 import org.apache.inlong.manager.pojo.common.Response;
@@ -88,6 +89,22 @@ public class StreamSourceClient {
         return response.getData();
     }
 
+    /**
+     * Force deletes the stream source by groupId and streamId
+     *
+     * @param groupId The belongs group id.
+     * @param streamId The belongs stream id.
+     * @return Whether succeed
+     */
+    public boolean forceDelete(String groupId, String streamId) {
+        Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
+        Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
+
+        Response<Boolean> response = ClientUtils.executeHttpCall(streamSourceApi.forceDelete(groupId, streamId));
+        ClientUtils.assertRespSuccess(response);
+        return response.getData();
+    }
+
     public StreamSource get(int id) {
         Preconditions.checkTrue(id > 0, "sourceId is illegal");
         Response<StreamSource> response = ClientUtils.executeHttpCall(streamSourceApi.get(id));
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSourceApi.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSourceApi.java
index 5c0fe8c15..33c933486 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSourceApi.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSourceApi.java
@@ -44,6 +44,10 @@ public interface StreamSourceApi {
     @DELETE("source/delete/{id}")
     Call<Response<Boolean>> deleteSource(@Path("id") Integer sourceId);
 
+    @DELETE("source/forceDelete")
+    Call<Response<Boolean>> forceDelete(@Query("inlongGroupId") String groupId,
+            @Query("inlongStreamId") String streamId);
+
     @GET("source/get/{id}")
     Call<Response<StreamSource>> get(@Path("id") Integer id);
 }
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
index 6e851e11c..aee466d88 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
@@ -105,6 +105,9 @@ public interface StreamSourceEntityMapper {
 
     int updateByPrimaryKeySelective(StreamSourceEntity record);
 
+    int updateByRelatedId(@Param("groupId") String groupId, @Param("streamId") String streamId,
+            @Param("status") Integer status);
+
     int updateByPrimaryKey(StreamSourceEntity record);
 
     /**
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceFieldEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceFieldEntityMapper.java
index 0059e8362..0e67a9f77 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceFieldEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceFieldEntityMapper.java
@@ -26,14 +26,12 @@ import java.util.List;
 @Repository
 public interface StreamSourceFieldEntityMapper {
 
-    int deleteByPrimaryKey(Integer id);
-
     int insert(StreamSourceFieldEntity record);
 
     int insertSelective(StreamSourceFieldEntity record);
 
     /**
-     * Selete undeleted source field by source id.
+     * Select undeleted source field by source id.
      *
      * @param sourceId source id
      * @return stream source field list
@@ -44,6 +42,13 @@ public interface StreamSourceFieldEntityMapper {
 
     int updateByPrimaryKey(StreamSourceFieldEntity record);
 
+    /**
+     * Logically delete all stream source fields based on inlong group id and inlong stream id
+     *
+     * @return rows deleted
+     */
+    int updateByRelatedId(@Param("groupId") String groupId, @Param("streamId") String streamId);
+
     /**
      * Insert all field list
      *
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index 7b1d21071..e1e7f56a8 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -60,7 +60,7 @@
                                    previous_status, creator, modifier)
         values (#{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR},
                 #{sourceType,jdbcType=VARCHAR}, #{sourceName,jdbcType=VARCHAR}, #{templateId,jdbcType=INTEGER},
-                #{agentIp,jdbcType=VARCHAR},#{uuid,jdbcType=VARCHAR},
+                #{agentIp,jdbcType=VARCHAR}, #{uuid,jdbcType=VARCHAR},
                 #{dataNodeName,jdbcType=VARCHAR}, #{inlongClusterName,jdbcType=VARCHAR},
                 #{serializationType,jdbcType=VARCHAR}, #{snapshot,jdbcType=LONGVARCHAR},
                 #{modifyTime,jdbcType=TIMESTAMP}, #{extParams,jdbcType=LONGVARCHAR}, #{status,jdbcType=INTEGER},
@@ -271,6 +271,19 @@
         </where>
     </select>
 
+    <update id="updateByRelatedId">
+        update stream_source
+        <set>
+            is_deleted = id,
+            previous_status = status,
+            status = #{status, jdbcType=INTEGER},
+            version = version + 1
+        </set>
+        where is_deleted = 0
+        and inlong_group_id = #{groupId, jdbcType=VARCHAR}
+        and inlong_stream_id = #{streamId, jdbcType=VARCHAR}
+    </update>
+
     <update id="updateByPrimaryKeySelective" parameterType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
         update stream_source
         <set>
@@ -351,7 +364,7 @@
             creator             = #{creator,jdbcType=VARCHAR},
             modifier            = #{modifier,jdbcType=VARCHAR}
         where id = #{id,jdbcType=INTEGER}
-        and version = #{version,jdbcType=INTEGER}
+          and version = #{version,jdbcType=INTEGER}
     </update>
     <update id="updateStatus">
         update stream_source
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceFieldEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceFieldEntityMapper.xml
index f4ba00366..43cbc1187 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceFieldEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceFieldEntityMapper.xml
@@ -48,11 +48,7 @@
         where source_id = #{sourceId,jdbcType=INTEGER}
         and is_deleted = 0
     </select>
-    <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
-        delete
-        from stream_source_field
-        where id = #{id,jdbcType=INTEGER}
-    </delete>
+
     <insert id="insert" useGeneratedKeys="true" keyProperty="id"
             parameterType="org.apache.inlong.manager.dao.entity.StreamSourceFieldEntity">
         insert into stream_source_field (id, inlong_group_id, inlong_stream_id,
@@ -231,6 +227,15 @@
             is_deleted       = #{isDeleted,jdbcType=INTEGER}
         where id = #{id,jdbcType=INTEGER}
     </update>
+    <update id="updateByRelatedId">
+        update stream_source_field
+        <set>
+            is_deleted = id
+        </set>
+        where is_deleted = 0
+        and inlong_group_id = #{groupId, jdbcType=VARCHAR}
+        and inlong_stream_id = #{streamId, jdbcType=VARCHAR}
+    </update>
 
     <insert id="insertAll" parameterType="org.apache.inlong.manager.dao.entity.StreamSourceFieldEntity">
         insert into stream_source_field (id, inlong_group_id,
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
index f0940cecb..8037c2994 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
@@ -116,6 +116,16 @@ public interface StreamSourceService {
      */
     Boolean delete(Integer id, String operator);
 
+    /**
+     * Force deletes the stream source by groupId and streamId
+     *
+     * @param groupId The belongs group id.
+     * @param streamId The belongs stream id.
+     * @param operator Operator's name
+     * @return Whether succeed
+     */
+    Boolean forceDelete(String groupId, String streamId, String operator);
+
     /**
      * Delete the stream source by the given id and source type.
      *
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index 2f2214ce1..f3782a7cd 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -267,6 +267,20 @@ public class StreamSourceServiceImpl implements StreamSourceService {
         return true;
     }
 
+    @Override
+    public Boolean forceDelete(String groupId, String streamId, String operator) {
+        LOGGER.info("begin to force delete source for groupId={} and streamId={} by user={}",
+                groupId, streamId, operator);
+        Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
+        Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
+
+        int sourceCount = sourceMapper.updateByRelatedId(groupId, streamId, SourceStatus.SOURCE_DISABLE.getCode());
+        int fieldCount = sourceFieldMapper.updateByRelatedId(groupId, streamId);
+        LOGGER.info("success to force delete source for groupId={} and streamId={} by user={},"
+                        + " update {} sources and {} fields", groupId, streamId, operator, sourceCount, fieldCount);
+        return true;
+    }
+
     @Override
     @Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRES_NEW,
             isolation = Isolation.READ_COMMITTED)
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
index acfdbd1ca..766fd9779 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.web.controller;
 
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiImplicitParam;
+import io.swagger.annotations.ApiImplicitParams;
 import io.swagger.annotations.ApiOperation;
 import org.apache.inlong.manager.common.enums.OperationType;
 import org.apache.inlong.manager.common.validation.UpdateValidation;
@@ -36,6 +37,7 @@ import org.springframework.web.bind.annotation.PathVariable;
 import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.RestController;
 
 /**
@@ -85,4 +87,15 @@ public class StreamSourceController {
         return Response.success(result);
     }
 
-}
\ No newline at end of file
+    @RequestMapping(value = "/source/forceDelete", method = RequestMethod.DELETE)
+    @OperationLog(operation = OperationType.DELETE)
+    @ApiOperation(value = "Force delete stream source by groupId and streamId")
+    @ApiImplicitParams({
+            @ApiImplicitParam(name = "groupId", dataTypeClass = String.class, required = true),
+            @ApiImplicitParam(name = "streamId", dataTypeClass = String.class, required = true)
+    })
+    public Response<Boolean> forceDelete(@RequestParam String groupId, @RequestParam String streamId) {
+        return Response.success(sourceService.forceDelete(groupId, streamId, LoginUserUtils.getLoginUser().getName()));
+    }
+
+}