You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/08/12 09:58:10 UTC

[inlong] branch master updated: [INLONG-5529][Manger] Fix the error of version not set before updating operation (#5530)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new cdd53a510 [INLONG-5529][Manger] Fix the error of version not set before updating operation (#5530)
cdd53a510 is described below

commit cdd53a510eef77407a874b6832d76eee511ee4ab
Author: woofyzhao <49...@qq.com>
AuthorDate: Fri Aug 12 17:58:05 2022 +0800

    [INLONG-5529][Manger] Fix the error of version not set before updating operation (#5530)
---
 .../manager/client/api/impl/DefaultInlongStreamBuilder.java  |  5 ++++-
 .../inlong/manager/client/api/impl/InlongGroupImpl.java      | 12 ++++++++----
 2 files changed, 12 insertions(+), 5 deletions(-)

diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
index 51477286b..b3d16e4bb 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
@@ -34,6 +34,7 @@ import org.apache.inlong.manager.client.api.inner.client.StreamSourceClient;
 import org.apache.inlong.manager.client.api.inner.client.StreamTransformClient;
 import org.apache.inlong.manager.client.api.util.ClientUtils;
 import org.apache.inlong.manager.client.api.util.StreamTransformTransfer;
+import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.sink.SinkRequest;
 import org.apache.inlong.manager.pojo.sink.StreamSink;
@@ -45,7 +46,6 @@ import org.apache.inlong.manager.pojo.stream.StreamPipeline;
 import org.apache.inlong.manager.pojo.stream.StreamTransform;
 import org.apache.inlong.manager.pojo.transform.TransformRequest;
 import org.apache.inlong.manager.pojo.transform.TransformResponse;
-import org.apache.inlong.manager.common.util.JsonUtils;
 
 import java.util.List;
 import java.util.Map;
@@ -192,6 +192,7 @@ public class DefaultInlongStreamBuilder extends InlongStreamBuilder {
             } else {
                 TransformRequest transformRequest = transformRequests.get(transformName);
                 transformRequest.setId(id);
+                transformRequest.setVersion(transformResponse.getVersion());
                 Pair<Boolean, String> updateState = transformClient.updateTransform(transformRequest);
                 if (!updateState.getKey()) {
                     throw new RuntimeException(String.format("Update transform=%s failed with err=%s", transformRequest,
@@ -230,6 +231,7 @@ public class DefaultInlongStreamBuilder extends InlongStreamBuilder {
                 } else {
                     SourceRequest sourceRequest = sourceRequests.get(sourceName);
                     sourceRequest.setId(id);
+                    sourceRequest.setVersion(source.getVersion());
                     Pair<Boolean, String> updateState = sourceClient.updateSource(sourceRequest);
                     if (!updateState.getKey()) {
                         throw new RuntimeException(String.format("Update source=%s failed with err=%s", sourceRequest,
@@ -268,6 +270,7 @@ public class DefaultInlongStreamBuilder extends InlongStreamBuilder {
             } else {
                 SinkRequest sinkRequest = sinkRequests.get(sinkName);
                 sinkRequest.setId(id);
+                sinkRequest.setVersion(sink.getVersion());
                 Pair<Boolean, String> updateState = sinkClient.updateSink(sinkRequest);
                 if (!updateState.getKey()) {
                     throw new RuntimeException(String.format("Update sink=%s failed with err=%s", sinkRequest,
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
index 8e73a8750..857680fff 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
@@ -81,7 +81,7 @@ public class InlongGroupImpl implements InlongGroup {
 
         InlongGroupInfo newGroupInfo = groupClient.getGroupIfExists(groupInfo.getInlongGroupId());
         if (newGroupInfo != null) {
-            this.groupContext.setGroupInfo(groupInfo);
+            this.groupContext.setGroupInfo(newGroupInfo);
         } else {
             BaseSortConf sortConf = groupInfo.getSortConf();
             InlongGroupTransfer.createGroupInfo(groupInfo, sortConf);
@@ -148,11 +148,13 @@ public class InlongGroupImpl implements InlongGroup {
                 "groupId must be same");
 
         InlongGroupInfo existGroupInfo = groupClient.getGroupInfo(groupId);
+        Preconditions.checkNotNull(existGroupInfo, "inlong group does not exists, cannot update");
         SimpleGroupStatus status = SimpleGroupStatus.parseStatusByCode(existGroupInfo.getStatus());
         Preconditions.checkTrue(status != SimpleGroupStatus.INITIALIZING,
                 "inlong group is in init status, should not be updated");
 
         InlongGroupInfo groupInfo = InlongGroupTransfer.createGroupInfo(originGroupInfo, sortConf);
+        groupInfo.setVersion(existGroupInfo.getVersion());
         InlongGroupRequest groupRequest = groupInfo.genRequest();
         Pair<String, String> idAndErr = groupClient.updateGroup(groupRequest);
         String errMsg = idAndErr.getValue();
@@ -167,13 +169,15 @@ public class InlongGroupImpl implements InlongGroup {
         Preconditions.checkNotNull(sortConf, "sort conf cannot be null");
 
         final String groupId = this.groupInfo.getInlongGroupId();
-        InlongGroupInfo groupInfo = groupClient.getGroupInfo(groupId);
 
-        SimpleGroupStatus status = SimpleGroupStatus.parseStatusByCode(groupInfo.getStatus());
+        InlongGroupInfo existGroupInfo = groupClient.getGroupInfo(groupId);
+        Preconditions.checkNotNull(existGroupInfo, "inlong group does not exists, cannot update");
+        SimpleGroupStatus status = SimpleGroupStatus.parseStatusByCode(existGroupInfo.getStatus());
         Preconditions.checkTrue(status != SimpleGroupStatus.INITIALIZING,
                 "inlong group is in init status, should not be updated");
 
-        groupInfo = InlongGroupTransfer.createGroupInfo(this.groupInfo, sortConf);
+        InlongGroupInfo groupInfo = InlongGroupTransfer.createGroupInfo(this.groupInfo, sortConf);
+        groupInfo.setVersion(existGroupInfo.getVersion());
         InlongGroupRequest groupRequest = groupInfo.genRequest();
         Pair<String, String> idAndErr = groupClient.updateGroup(groupRequest);
         String errMsg = idAndErr.getValue();