You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/10/26 06:51:32 UTC
[inlong] branch master updated: [INLONG-6282][Manager] Fix the delete API not working problem in the manager client (#6287)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new bee011fd9 [INLONG-6282][Manager] Fix the delete API not working problem in the manager client (#6287)
bee011fd9 is described below
commit bee011fd99dbe9cbbfc58cc69e4ebc2919e253e7
Author: woofyzhao <49...@qq.com>
AuthorDate: Wed Oct 26 14:51:27 2022 +0800
[INLONG-6282][Manager] Fix the delete API not working problem in the manager client (#6287)
---
.../manager/client/api/impl/InlongStreamImpl.java | 32 ++++++++++++++++------
1 file changed, 23 insertions(+), 9 deletions(-)
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
index da288673c..283331c5d 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
@@ -77,6 +77,12 @@ public class InlongStreamImpl implements InlongStream {
private Map<String, StreamTransform> streamTransforms = Maps.newHashMap();
+ private Set<String> sourcesToDelete = Sets.newHashSet();
+
+ private Set<String> sinksToDelete = Sets.newHashSet();
+
+ private Set<String> transformsToDelete = Sets.newHashSet();
+
private List<StreamField> streamFields = Lists.newArrayList();
/**
@@ -196,19 +202,19 @@ public class InlongStreamImpl implements InlongStream {
@Override
public InlongStream deleteSource(String sourceName) {
- streamSources.remove(sourceName);
+ sourcesToDelete.add(sourceName);
return this;
}
@Override
public InlongStream deleteSink(String sinkName) {
- streamSinks.remove(sinkName);
+ sinksToDelete.add(sinkName);
return this;
}
@Override
public InlongStream deleteTransform(String transformName) {
- streamTransforms.remove(transformName);
+ transformsToDelete.add(transformName);
return this;
}
@@ -345,8 +351,12 @@ public class InlongStreamImpl implements InlongStream {
updateState.getValue()));
}
updateTransformNames.add(transformName);
- } else {
- log.warn("Unknown transform {} from server", transformName);
+ } else if (transformsToDelete.contains(transformName)) {
+ TransformRequest transformRequest = StreamTransformTransfer.createTransformRequest(transform,
+ streamInfo);
+ if (!transformClient.deleteTransform(transformRequest)) {
+ throw new RuntimeException(String.format("Delete transform=%s failed", transformRequest));
+ }
}
}
for (Map.Entry<String, StreamTransform> transformEntry : this.streamTransforms.entrySet()) {
@@ -379,8 +389,10 @@ public class InlongStreamImpl implements InlongStream {
updateState.getValue()));
}
updateSourceNames.add(sourceName);
- } else {
- log.warn("Unknown source {} from server", sourceName);
+ } else if (sourcesToDelete.contains(sourceName)) {
+ if (!sourceClient.deleteSource(id)) {
+ throw new RuntimeException(String.format("Delete source=%s failed", source));
+ }
}
}
for (Map.Entry<String, StreamSource> sourceEntry : this.streamSources.entrySet()) {
@@ -414,8 +426,10 @@ public class InlongStreamImpl implements InlongStream {
updateState.getValue()));
}
updateSinkNames.add(sinkName);
- } else {
- log.error("Unknown sink {} from server", sinkName);
+ } else if (sinksToDelete.contains(sinkName)) {
+ if (!sinkClient.deleteSink(id)) {
+ throw new RuntimeException(String.format("Delete sink=%s failed", sink));
+ }
}
}