You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/13 03:03:27 UTC
[inlong] branch master updated: [INLONG-5849][Manager] Update the inlong group ext info by the client (#5850)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new c85b1bc2f [INLONG-5849][Manager] Update the inlong group ext info by the client (#5850)
c85b1bc2f is described below
commit c85b1bc2f04d8844e787f5fc0c52f39399a3296d
Author: healchow <he...@gmail.com>
AuthorDate: Tue Sep 13 11:03:22 2022 +0800
[INLONG-5849][Manager] Update the inlong group ext info by the client (#5850)
---
.../inlong/manager/client/cli/CreateCommand.java | 4 +-
.../manager/client/api/impl/InlongClientImpl.java | 20 +++++-----
.../manager/client/api/impl/InlongGroupImpl.java | 45 +++++++++++-----------
.../client/api/util/InlongGroupTransfer.java | 7 +---
4 files changed, 37 insertions(+), 39 deletions(-)
diff --git a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CreateCommand.java b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CreateCommand.java
index 2a099f958..24b46f34a 100644
--- a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CreateCommand.java
+++ b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CreateCommand.java
@@ -70,9 +70,9 @@ public class CreateCommand extends AbstractCommand {
return;
}
}
- // first extract groupconfig from the file passed in
+ // first extract group config from the file passed in
CreateGroupConf groupConf = objectMapper.readValue(content, CreateGroupConf.class);
- // get the correspodning inlonggroup, a.k.a the task to execute
+ // get the corresponding inlong group, aka the task to execute
InlongClient inlongClient = ClientUtils.getClient();
InlongGroup group = inlongClient.forGroup(groupConf.getGroupInfo());
InlongStreamBuilder streamBuilder = group.createStream(groupConf.getStreamInfo());
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
index eac22c0fa..9e84a30ef 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
@@ -28,12 +28,12 @@ import org.apache.inlong.manager.client.api.ClientConfiguration;
import org.apache.inlong.manager.client.api.InlongClient;
import org.apache.inlong.manager.client.api.InlongCluster;
import org.apache.inlong.manager.client.api.InlongGroup;
-import org.apache.inlong.manager.common.enums.SimpleGroupStatus;
-import org.apache.inlong.manager.common.enums.SimpleSourceStatus;
import org.apache.inlong.manager.client.api.inner.client.ClientFactory;
import org.apache.inlong.manager.client.api.inner.client.InlongClusterClient;
import org.apache.inlong.manager.client.api.inner.client.InlongGroupClient;
import org.apache.inlong.manager.client.api.util.ClientUtils;
+import org.apache.inlong.manager.common.enums.SimpleGroupStatus;
+import org.apache.inlong.manager.common.enums.SimpleSourceStatus;
import org.apache.inlong.manager.common.util.HttpUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.pojo.cluster.BindTagRequest;
@@ -108,17 +108,17 @@ public class InlongClientImpl implements InlongClient {
@Override
public List<InlongGroup> listGroup(String expr, int status, int pageNum, int pageSize) {
- PageResult<InlongGroupBriefInfo> pageInfo = groupClient.listGroups(expr, status, pageNum,
- pageSize);
+ PageResult<InlongGroupBriefInfo> pageInfo = groupClient.listGroups(expr, status, pageNum, pageSize);
if (CollectionUtils.isEmpty(pageInfo.getList())) {
return Lists.newArrayList();
- } else {
- return pageInfo.getList().stream().map(briefInfo -> {
- String groupId = briefInfo.getInlongGroupId();
- InlongGroupInfo groupInfo = groupClient.getGroupInfo(groupId);
- return new InlongGroupImpl(groupInfo, configuration);
- }).collect(Collectors.toList());
}
+
+ return pageInfo.getList().stream()
+ .map(info -> {
+ String groupId = info.getInlongGroupId();
+ InlongGroupInfo groupInfo = groupClient.getGroupInfo(groupId);
+ return new InlongGroupImpl(groupInfo, configuration);
+ }).collect(Collectors.toList());
}
@Override
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
index 348dcf3ea..df65134b4 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.client.api.impl;
+import com.google.common.base.Objects;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.inlong.manager.client.api.ClientConfiguration;
@@ -62,7 +63,7 @@ public class InlongGroupImpl implements InlongGroup {
private final InnerGroupContext groupContext;
private final InlongGroupClient groupClient;
- private final WorkflowClient workFlowClient;
+ private final WorkflowClient workflowClient;
private final InlongStreamClient streamClient;
private final ClientConfiguration configuration;
private InlongGroupInfo groupInfo;
@@ -70,23 +71,25 @@ public class InlongGroupImpl implements InlongGroup {
public InlongGroupImpl(InlongGroupInfo groupInfo, ClientConfiguration configuration) {
this.groupInfo = groupInfo;
this.groupContext = new InnerGroupContext();
- this.groupContext.setGroupInfo(groupInfo);
this.configuration = configuration;
ClientFactory clientFactory = ClientUtils.getClientFactory(configuration);
this.streamClient = clientFactory.getStreamClient();
this.groupClient = clientFactory.getGroupClient();
- this.workFlowClient = clientFactory.getWorkflowClient();
-
- InlongGroupInfo newGroupInfo = groupClient.getGroupIfExists(groupInfo.getInlongGroupId());
- if (newGroupInfo != null) {
- this.groupContext.setGroupInfo(newGroupInfo);
- } else {
- BaseSortConf sortConf = groupInfo.getSortConf();
- InlongGroupTransfer.createGroupInfo(groupInfo, sortConf);
- String groupId = groupClient.createGroup(groupInfo.genRequest());
+ this.workflowClient = clientFactory.getWorkflowClient();
+
+ String groupId = groupInfo.getInlongGroupId();
+ InlongGroupInfo existGroupInfo = groupClient.getGroupInfo(groupId);
+ if (existGroupInfo == null) {
+ InlongGroupTransfer.createGroupInfo(groupInfo, groupInfo.getSortConf());
+ groupId = groupClient.createGroup(groupInfo.genRequest());
groupInfo.setInlongGroupId(groupId);
}
+
+ // after creating, change the group version
+ InlongGroupInfo newGroupInfo = groupClient.getGroupInfo(groupId);
+ groupInfo.setVersion(newGroupInfo.getVersion());
+ this.groupContext.setGroupInfo(groupInfo);
}
@Override
@@ -129,7 +132,7 @@ public class InlongGroupImpl implements InlongGroup {
Preconditions.checkNotNull(groupProcessForm, "ApplyGroupProcessForm cannot be null");
groupContext.setInitMsg(groupProcessForm);
assert groupProcessForm != null;
- WorkflowResult startWorkflowResult = workFlowClient.startInlongGroup(taskId, groupProcessForm);
+ WorkflowResult startWorkflowResult = workflowClient.startInlongGroup(taskId, groupProcessForm);
processView = startWorkflowResult.getProcessInfo();
Preconditions.checkTrue(ProcessStatus.COMPLETED == processView.getStatus(),
String.format("inlong group status %s is incorrect, should be COMPLETED", processView.getStatus()));
@@ -142,8 +145,7 @@ public class InlongGroupImpl implements InlongGroup {
originGroupInfo = this.groupInfo;
}
- final String groupId = originGroupInfo.getInlongGroupId();
- Preconditions.checkTrue(groupId != null && groupId.equals(this.groupInfo.getInlongGroupId()),
+ Preconditions.checkTrue(Objects.equal(originGroupInfo.getInlongGroupId(), this.groupInfo.getInlongGroupId()),
"groupId must be same");
InlongGroupInfo groupInfo = InlongGroupTransfer.createGroupInfo(originGroupInfo, sortConf);
@@ -175,14 +177,13 @@ public class InlongGroupImpl implements InlongGroup {
@Override
public InlongGroupContext reInitOnUpdate(InlongGroupInfo originGroupInfo, BaseSortConf sortConf) throws Exception {
this.update(originGroupInfo, sortConf);
- String inlongGroupId = this.groupContext.getGroupInfo().getInlongGroupId();
- InlongGroupInfo newGroupInfo = groupClient.getGroupIfExists(inlongGroupId);
- if (newGroupInfo != null) {
- this.groupContext.setGroupInfo(newGroupInfo);
- } else {
- throw new RuntimeException(String.format("Group not found by inlongGroupId=%s", inlongGroupId));
+ String groupId = this.groupContext.getGroupInfo().getInlongGroupId();
+ InlongGroupInfo newGroupInfo = groupClient.getGroupInfo(groupId);
+ if (newGroupInfo == null) {
+ throw new RuntimeException(String.format("Group not found by inlongGroupId=%s", groupId));
}
+ this.groupContext.setGroupInfo(newGroupInfo);
return init();
}
@@ -260,14 +261,14 @@ public class InlongGroupImpl implements InlongGroup {
}
private InlongGroupContext generateSnapshot() {
- // fetch current group
InlongGroupInfo groupInfo = groupClient.getGroupInfo(groupContext.getGroupId());
- // if current group is not exists, set deleted status
+ // if current group is not exists, set its status to deleted
if (groupInfo == null) {
groupInfo = groupContext.getGroupInfo();
groupInfo.setStatus(GroupStatus.DELETED.getCode());
return new InlongGroupContext(groupContext);
}
+
groupContext.setGroupInfo(groupInfo);
String inlongGroupId = groupInfo.getInlongGroupId();
// fetch stream in group
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java
index c9f7cfc65..d079a406b 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java
@@ -18,7 +18,6 @@
package org.apache.inlong.manager.client.api.util;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Lists;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.auth.Authentication;
@@ -52,10 +51,8 @@ public class InlongGroupTransfer {
String groupId = groupInfo.getInlongGroupId();
Preconditions.checkNotEmpty(groupId, "groupId cannot be empty");
// init extensions
- if (groupInfo.getExtList() != null) {
- groupInfo.setExtList(groupInfo.getExtList());
- } else {
- groupInfo.setExtList(Lists.newArrayList());
+ if (groupInfo.getExtList() == null) {
+ groupInfo.setExtList(new ArrayList<>());
}
// set authentication into group ext list
List<InlongGroupExtInfo> extInfos = new ArrayList<>();