You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/08/12 09:58:10 UTC
[inlong] branch master updated: [INLONG-5529][Manger] Fix the error of version not set before updating operation (#5530)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new cdd53a510 [INLONG-5529][Manger] Fix the error of version not set before updating operation (#5530)
cdd53a510 is described below
commit cdd53a510eef77407a874b6832d76eee511ee4ab
Author: woofyzhao <49...@qq.com>
AuthorDate: Fri Aug 12 17:58:05 2022 +0800
[INLONG-5529][Manger] Fix the error of version not set before updating operation (#5530)
---
.../manager/client/api/impl/DefaultInlongStreamBuilder.java | 5 ++++-
.../inlong/manager/client/api/impl/InlongGroupImpl.java | 12 ++++++++----
2 files changed, 12 insertions(+), 5 deletions(-)
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
index 51477286b..b3d16e4bb 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
@@ -34,6 +34,7 @@ import org.apache.inlong.manager.client.api.inner.client.StreamSourceClient;
import org.apache.inlong.manager.client.api.inner.client.StreamTransformClient;
import org.apache.inlong.manager.client.api.util.ClientUtils;
import org.apache.inlong.manager.client.api.util.StreamTransformTransfer;
+import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.sink.SinkRequest;
import org.apache.inlong.manager.pojo.sink.StreamSink;
@@ -45,7 +46,6 @@ import org.apache.inlong.manager.pojo.stream.StreamPipeline;
import org.apache.inlong.manager.pojo.stream.StreamTransform;
import org.apache.inlong.manager.pojo.transform.TransformRequest;
import org.apache.inlong.manager.pojo.transform.TransformResponse;
-import org.apache.inlong.manager.common.util.JsonUtils;
import java.util.List;
import java.util.Map;
@@ -192,6 +192,7 @@ public class DefaultInlongStreamBuilder extends InlongStreamBuilder {
} else {
TransformRequest transformRequest = transformRequests.get(transformName);
transformRequest.setId(id);
+ transformRequest.setVersion(transformResponse.getVersion());
Pair<Boolean, String> updateState = transformClient.updateTransform(transformRequest);
if (!updateState.getKey()) {
throw new RuntimeException(String.format("Update transform=%s failed with err=%s", transformRequest,
@@ -230,6 +231,7 @@ public class DefaultInlongStreamBuilder extends InlongStreamBuilder {
} else {
SourceRequest sourceRequest = sourceRequests.get(sourceName);
sourceRequest.setId(id);
+ sourceRequest.setVersion(source.getVersion());
Pair<Boolean, String> updateState = sourceClient.updateSource(sourceRequest);
if (!updateState.getKey()) {
throw new RuntimeException(String.format("Update source=%s failed with err=%s", sourceRequest,
@@ -268,6 +270,7 @@ public class DefaultInlongStreamBuilder extends InlongStreamBuilder {
} else {
SinkRequest sinkRequest = sinkRequests.get(sinkName);
sinkRequest.setId(id);
+ sinkRequest.setVersion(sink.getVersion());
Pair<Boolean, String> updateState = sinkClient.updateSink(sinkRequest);
if (!updateState.getKey()) {
throw new RuntimeException(String.format("Update sink=%s failed with err=%s", sinkRequest,
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
index 8e73a8750..857680fff 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
@@ -81,7 +81,7 @@ public class InlongGroupImpl implements InlongGroup {
InlongGroupInfo newGroupInfo = groupClient.getGroupIfExists(groupInfo.getInlongGroupId());
if (newGroupInfo != null) {
- this.groupContext.setGroupInfo(groupInfo);
+ this.groupContext.setGroupInfo(newGroupInfo);
} else {
BaseSortConf sortConf = groupInfo.getSortConf();
InlongGroupTransfer.createGroupInfo(groupInfo, sortConf);
@@ -148,11 +148,13 @@ public class InlongGroupImpl implements InlongGroup {
"groupId must be same");
InlongGroupInfo existGroupInfo = groupClient.getGroupInfo(groupId);
+ Preconditions.checkNotNull(existGroupInfo, "inlong group does not exists, cannot update");
SimpleGroupStatus status = SimpleGroupStatus.parseStatusByCode(existGroupInfo.getStatus());
Preconditions.checkTrue(status != SimpleGroupStatus.INITIALIZING,
"inlong group is in init status, should not be updated");
InlongGroupInfo groupInfo = InlongGroupTransfer.createGroupInfo(originGroupInfo, sortConf);
+ groupInfo.setVersion(existGroupInfo.getVersion());
InlongGroupRequest groupRequest = groupInfo.genRequest();
Pair<String, String> idAndErr = groupClient.updateGroup(groupRequest);
String errMsg = idAndErr.getValue();
@@ -167,13 +169,15 @@ public class InlongGroupImpl implements InlongGroup {
Preconditions.checkNotNull(sortConf, "sort conf cannot be null");
final String groupId = this.groupInfo.getInlongGroupId();
- InlongGroupInfo groupInfo = groupClient.getGroupInfo(groupId);
- SimpleGroupStatus status = SimpleGroupStatus.parseStatusByCode(groupInfo.getStatus());
+ InlongGroupInfo existGroupInfo = groupClient.getGroupInfo(groupId);
+ Preconditions.checkNotNull(existGroupInfo, "inlong group does not exists, cannot update");
+ SimpleGroupStatus status = SimpleGroupStatus.parseStatusByCode(existGroupInfo.getStatus());
Preconditions.checkTrue(status != SimpleGroupStatus.INITIALIZING,
"inlong group is in init status, should not be updated");
- groupInfo = InlongGroupTransfer.createGroupInfo(this.groupInfo, sortConf);
+ InlongGroupInfo groupInfo = InlongGroupTransfer.createGroupInfo(this.groupInfo, sortConf);
+ groupInfo.setVersion(existGroupInfo.getVersion());
InlongGroupRequest groupRequest = groupInfo.genRequest();
Pair<String, String> idAndErr = groupClient.updateGroup(groupRequest);
String errMsg = idAndErr.getValue();