You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/10/26 06:51:32 UTC

[inlong] branch master updated: [INLONG-6282][Manager] Fix the delete API not working problem in the manager client (#6287)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new bee011fd9 [INLONG-6282][Manager] Fix the delete API not working problem in the manager client (#6287)
bee011fd9 is described below

commit bee011fd99dbe9cbbfc58cc69e4ebc2919e253e7
Author: woofyzhao <49...@qq.com>
AuthorDate: Wed Oct 26 14:51:27 2022 +0800

    [INLONG-6282][Manager] Fix the delete API not working problem in the manager client (#6287)
---
 .../manager/client/api/impl/InlongStreamImpl.java  | 32 ++++++++++++++++------
 1 file changed, 23 insertions(+), 9 deletions(-)

diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
index da288673c..283331c5d 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
@@ -77,6 +77,12 @@ public class InlongStreamImpl implements InlongStream {
 
     private Map<String, StreamTransform> streamTransforms = Maps.newHashMap();
 
+    private Set<String> sourcesToDelete = Sets.newHashSet();
+
+    private Set<String> sinksToDelete = Sets.newHashSet();
+
+    private Set<String> transformsToDelete = Sets.newHashSet();
+
     private List<StreamField> streamFields = Lists.newArrayList();
 
     /**
@@ -196,19 +202,19 @@ public class InlongStreamImpl implements InlongStream {
 
     @Override
     public InlongStream deleteSource(String sourceName) {
-        streamSources.remove(sourceName);
+        sourcesToDelete.add(sourceName);
         return this;
     }
 
     @Override
     public InlongStream deleteSink(String sinkName) {
-        streamSinks.remove(sinkName);
+        sinksToDelete.add(sinkName);
         return this;
     }
 
     @Override
     public InlongStream deleteTransform(String transformName) {
-        streamTransforms.remove(transformName);
+        transformsToDelete.add(transformName);
         return this;
     }
 
@@ -345,8 +351,12 @@ public class InlongStreamImpl implements InlongStream {
                             updateState.getValue()));
                 }
                 updateTransformNames.add(transformName);
-            } else {
-                log.warn("Unknown transform {} from server", transformName);
+            } else if (transformsToDelete.contains(transformName)) {
+                TransformRequest transformRequest = StreamTransformTransfer.createTransformRequest(transform,
+                        streamInfo);
+                if (!transformClient.deleteTransform(transformRequest)) {
+                    throw new RuntimeException(String.format("Delete transform=%s failed", transformRequest));
+                }
             }
         }
         for (Map.Entry<String, StreamTransform> transformEntry : this.streamTransforms.entrySet()) {
@@ -379,8 +389,10 @@ public class InlongStreamImpl implements InlongStream {
                             updateState.getValue()));
                 }
                 updateSourceNames.add(sourceName);
-            } else {
-                log.warn("Unknown source {} from server", sourceName);
+            } else if (sourcesToDelete.contains(sourceName)) {
+                if (!sourceClient.deleteSource(id)) {
+                    throw new RuntimeException(String.format("Delete source=%s failed", source));
+                }
             }
         }
         for (Map.Entry<String, StreamSource> sourceEntry : this.streamSources.entrySet()) {
@@ -414,8 +426,10 @@ public class InlongStreamImpl implements InlongStream {
                             updateState.getValue()));
                 }
                 updateSinkNames.add(sinkName);
-            } else {
-                log.error("Unknown sink {} from server", sinkName);
+            } else if (sinksToDelete.contains(sinkName)) {
+                if (!sinkClient.deleteSink(id)) {
+                    throw new RuntimeException(String.format("Delete sink=%s failed", sink));
+                }
             }
         }