You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/13 03:13:11 UTC

[inlong] branch release-1.3.0 updated: [INLONG-5849][Manager] Update the inlong group ext info by the client (#5850)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/release-1.3.0 by this push:
     new d35a4df5b [INLONG-5849][Manager] Update the inlong group ext info by the client (#5850)
d35a4df5b is described below

commit d35a4df5b9b1f0a9716e205023f1562828e6b2ab
Author: healchow <he...@gmail.com>
AuthorDate: Tue Sep 13 11:03:22 2022 +0800

    [INLONG-5849][Manager] Update the inlong group ext info by the client (#5850)
---
 .../inlong/manager/client/cli/CreateCommand.java   |  4 +-
 .../manager/client/api/impl/InlongClientImpl.java  | 20 +++++-----
 .../manager/client/api/impl/InlongGroupImpl.java   | 45 +++++++++++-----------
 .../client/api/util/InlongGroupTransfer.java       |  7 +---
 4 files changed, 37 insertions(+), 39 deletions(-)

diff --git a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CreateCommand.java b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CreateCommand.java
index 2a099f958..24b46f34a 100644
--- a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CreateCommand.java
+++ b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CreateCommand.java
@@ -70,9 +70,9 @@ public class CreateCommand extends AbstractCommand {
                         return;
                     }
                 }
-                // first extract groupconfig from the file passed in
+                // first extract group config from the file passed in
                 CreateGroupConf groupConf = objectMapper.readValue(content, CreateGroupConf.class);
-                // get the correspodning inlonggroup, a.k.a the task to execute
+                // get the corresponding inlong group, aka the task to execute
                 InlongClient inlongClient = ClientUtils.getClient();
                 InlongGroup group = inlongClient.forGroup(groupConf.getGroupInfo());
                 InlongStreamBuilder streamBuilder = group.createStream(groupConf.getStreamInfo());
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
index eac22c0fa..9e84a30ef 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
@@ -28,12 +28,12 @@ import org.apache.inlong.manager.client.api.ClientConfiguration;
 import org.apache.inlong.manager.client.api.InlongClient;
 import org.apache.inlong.manager.client.api.InlongCluster;
 import org.apache.inlong.manager.client.api.InlongGroup;
-import org.apache.inlong.manager.common.enums.SimpleGroupStatus;
-import org.apache.inlong.manager.common.enums.SimpleSourceStatus;
 import org.apache.inlong.manager.client.api.inner.client.ClientFactory;
 import org.apache.inlong.manager.client.api.inner.client.InlongClusterClient;
 import org.apache.inlong.manager.client.api.inner.client.InlongGroupClient;
 import org.apache.inlong.manager.client.api.util.ClientUtils;
+import org.apache.inlong.manager.common.enums.SimpleGroupStatus;
+import org.apache.inlong.manager.common.enums.SimpleSourceStatus;
 import org.apache.inlong.manager.common.util.HttpUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.pojo.cluster.BindTagRequest;
@@ -108,17 +108,17 @@ public class InlongClientImpl implements InlongClient {
 
     @Override
     public List<InlongGroup> listGroup(String expr, int status, int pageNum, int pageSize) {
-        PageResult<InlongGroupBriefInfo> pageInfo = groupClient.listGroups(expr, status, pageNum,
-                pageSize);
+        PageResult<InlongGroupBriefInfo> pageInfo = groupClient.listGroups(expr, status, pageNum, pageSize);
         if (CollectionUtils.isEmpty(pageInfo.getList())) {
             return Lists.newArrayList();
-        } else {
-            return pageInfo.getList().stream().map(briefInfo -> {
-                String groupId = briefInfo.getInlongGroupId();
-                InlongGroupInfo groupInfo = groupClient.getGroupInfo(groupId);
-                return new InlongGroupImpl(groupInfo, configuration);
-            }).collect(Collectors.toList());
         }
+
+        return pageInfo.getList().stream()
+                .map(info -> {
+                    String groupId = info.getInlongGroupId();
+                    InlongGroupInfo groupInfo = groupClient.getGroupInfo(groupId);
+                    return new InlongGroupImpl(groupInfo, configuration);
+                }).collect(Collectors.toList());
     }
 
     @Override
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
index 348dcf3ea..df65134b4 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.client.api.impl;
 
+import com.google.common.base.Objects;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.inlong.manager.client.api.ClientConfiguration;
@@ -62,7 +63,7 @@ public class InlongGroupImpl implements InlongGroup {
 
     private final InnerGroupContext groupContext;
     private final InlongGroupClient groupClient;
-    private final WorkflowClient workFlowClient;
+    private final WorkflowClient workflowClient;
     private final InlongStreamClient streamClient;
     private final ClientConfiguration configuration;
     private InlongGroupInfo groupInfo;
@@ -70,23 +71,25 @@ public class InlongGroupImpl implements InlongGroup {
     public InlongGroupImpl(InlongGroupInfo groupInfo, ClientConfiguration configuration) {
         this.groupInfo = groupInfo;
         this.groupContext = new InnerGroupContext();
-        this.groupContext.setGroupInfo(groupInfo);
         this.configuration = configuration;
 
         ClientFactory clientFactory = ClientUtils.getClientFactory(configuration);
         this.streamClient = clientFactory.getStreamClient();
         this.groupClient = clientFactory.getGroupClient();
-        this.workFlowClient = clientFactory.getWorkflowClient();
-
-        InlongGroupInfo newGroupInfo = groupClient.getGroupIfExists(groupInfo.getInlongGroupId());
-        if (newGroupInfo != null) {
-            this.groupContext.setGroupInfo(newGroupInfo);
-        } else {
-            BaseSortConf sortConf = groupInfo.getSortConf();
-            InlongGroupTransfer.createGroupInfo(groupInfo, sortConf);
-            String groupId = groupClient.createGroup(groupInfo.genRequest());
+        this.workflowClient = clientFactory.getWorkflowClient();
+
+        String groupId = groupInfo.getInlongGroupId();
+        InlongGroupInfo existGroupInfo = groupClient.getGroupInfo(groupId);
+        if (existGroupInfo == null) {
+            InlongGroupTransfer.createGroupInfo(groupInfo, groupInfo.getSortConf());
+            groupId = groupClient.createGroup(groupInfo.genRequest());
             groupInfo.setInlongGroupId(groupId);
         }
+
+        // after creating, change the group version
+        InlongGroupInfo newGroupInfo = groupClient.getGroupInfo(groupId);
+        groupInfo.setVersion(newGroupInfo.getVersion());
+        this.groupContext.setGroupInfo(groupInfo);
     }
 
     @Override
@@ -129,7 +132,7 @@ public class InlongGroupImpl implements InlongGroup {
         Preconditions.checkNotNull(groupProcessForm, "ApplyGroupProcessForm cannot be null");
         groupContext.setInitMsg(groupProcessForm);
         assert groupProcessForm != null;
-        WorkflowResult startWorkflowResult = workFlowClient.startInlongGroup(taskId, groupProcessForm);
+        WorkflowResult startWorkflowResult = workflowClient.startInlongGroup(taskId, groupProcessForm);
         processView = startWorkflowResult.getProcessInfo();
         Preconditions.checkTrue(ProcessStatus.COMPLETED == processView.getStatus(),
                 String.format("inlong group status %s is incorrect, should be COMPLETED", processView.getStatus()));
@@ -142,8 +145,7 @@ public class InlongGroupImpl implements InlongGroup {
             originGroupInfo = this.groupInfo;
         }
 
-        final String groupId = originGroupInfo.getInlongGroupId();
-        Preconditions.checkTrue(groupId != null && groupId.equals(this.groupInfo.getInlongGroupId()),
+        Preconditions.checkTrue(Objects.equal(originGroupInfo.getInlongGroupId(), this.groupInfo.getInlongGroupId()),
                 "groupId must be same");
 
         InlongGroupInfo groupInfo = InlongGroupTransfer.createGroupInfo(originGroupInfo, sortConf);
@@ -175,14 +177,13 @@ public class InlongGroupImpl implements InlongGroup {
     @Override
     public InlongGroupContext reInitOnUpdate(InlongGroupInfo originGroupInfo, BaseSortConf sortConf) throws Exception {
         this.update(originGroupInfo, sortConf);
-        String inlongGroupId = this.groupContext.getGroupInfo().getInlongGroupId();
-        InlongGroupInfo newGroupInfo = groupClient.getGroupIfExists(inlongGroupId);
-        if (newGroupInfo != null) {
-            this.groupContext.setGroupInfo(newGroupInfo);
-        } else {
-            throw new RuntimeException(String.format("Group not found by inlongGroupId=%s", inlongGroupId));
+        String groupId = this.groupContext.getGroupInfo().getInlongGroupId();
+        InlongGroupInfo newGroupInfo = groupClient.getGroupInfo(groupId);
+        if (newGroupInfo == null) {
+            throw new RuntimeException(String.format("Group not found by inlongGroupId=%s", groupId));
         }
 
+        this.groupContext.setGroupInfo(newGroupInfo);
         return init();
     }
 
@@ -260,14 +261,14 @@ public class InlongGroupImpl implements InlongGroup {
     }
 
     private InlongGroupContext generateSnapshot() {
-        // fetch current group
         InlongGroupInfo groupInfo = groupClient.getGroupInfo(groupContext.getGroupId());
-        // if current group is not exists, set deleted status
+        // if current group is not exists, set its status to deleted
         if (groupInfo == null) {
             groupInfo = groupContext.getGroupInfo();
             groupInfo.setStatus(GroupStatus.DELETED.getCode());
             return new InlongGroupContext(groupContext);
         }
+
         groupContext.setGroupInfo(groupInfo);
         String inlongGroupId = groupInfo.getInlongGroupId();
         // fetch stream in group
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java
index c9f7cfc65..d079a406b 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java
@@ -18,7 +18,6 @@
 package org.apache.inlong.manager.client.api.util;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Lists;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.auth.Authentication;
@@ -52,10 +51,8 @@ public class InlongGroupTransfer {
         String groupId = groupInfo.getInlongGroupId();
         Preconditions.checkNotEmpty(groupId, "groupId cannot be empty");
         // init extensions
-        if (groupInfo.getExtList() != null) {
-            groupInfo.setExtList(groupInfo.getExtList());
-        } else {
-            groupInfo.setExtList(Lists.newArrayList());
+        if (groupInfo.getExtList() == null) {
+            groupInfo.setExtList(new ArrayList<>());
         }
         // set authentication into group ext list
         List<InlongGroupExtInfo> extInfos = new ArrayList<>();