You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/17 13:52:53 UTC

[incubator-inlong] branch master updated: [INLONG-3192][Manager] Add source state collect in Manager Client (#3194)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 01cc4f0  [INLONG-3192][Manager] Add source state collect in Manager Client (#3194)
01cc4f0 is described below

commit 01cc4f03a8e274a9f728ba7749a28ed6253851e7
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Thu Mar 17 21:52:43 2022 +0800

    [INLONG-3192][Manager] Add source state collect in Manager Client (#3194)
---
 .../manager/client/api/InlongGroupContext.java     | 71 ++++++++++++++++------
 .../inlong/manager/client/api/StreamSource.java    | 38 +++++++++++-
 .../manager/client/api/impl/InlongGroupImpl.java   |  4 +-
 .../api/util/InlongStreamSourceTransfer.java       |  6 +-
 4 files changed, 96 insertions(+), 23 deletions(-)

diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java
index 68789e1..92aaf76 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java
@@ -17,11 +17,16 @@
 
 package org.apache.inlong.manager.client.api;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import lombok.Data;
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.inlong.manager.client.api.StreamSource.State;
 import org.apache.inlong.manager.client.api.inner.InnerGroupContext;
 import org.apache.inlong.manager.client.api.util.AssertUtil;
+import org.apache.inlong.manager.client.api.util.GsonUtil;
+import org.apache.inlong.manager.common.enums.GroupState;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 
@@ -53,12 +58,12 @@ public class InlongGroupContext implements Serializable {
     /**
      * Error message for Inlong group, taskName->exceptionMsg.
      */
-    private Map<String, List<String>> errMsgs;
+    private Map<String, List<String>> groupErrLogs;
 
     /**
      * Logs for each stream, key: streamName, value: componentName->log
      */
-    private Map<String, Map<String, List<String>>> streamLogs = Maps.newHashMap();
+    private Map<String, Map<String, List<String>>> streamErrLogs = Maps.newHashMap();
 
     private InlongGroupState state;
 
@@ -69,9 +74,10 @@ public class InlongGroupContext implements Serializable {
         this.groupName = groupInfo.getName();
         this.groupConf = streamGroupConf;
         this.inlongStreamMap = groupContext.getStreamMap();
-        this.errMsgs = Maps.newHashMap();
+        this.groupErrLogs = Maps.newHashMap();
         this.groupLogs = Maps.newHashMap();
         this.state = InlongGroupState.parseByBizStatus(groupInfo.getStatus());
+        recheckState();
         this.extensions = Maps.newHashMap();
         List<InlongGroupExtInfo> extInfos = groupInfo.getExtList();
         if (CollectionUtils.isNotEmpty(extInfos)) {
@@ -81,36 +87,63 @@ public class InlongGroupContext implements Serializable {
         }
     }
 
+    private void recheckState() {
+        if (MapUtils.isEmpty(this.inlongStreamMap)) {
+            return;
+        }
+        List<StreamSource> failedSources = Lists.newArrayList();
+        this.inlongStreamMap.values().stream().forEach(inlongStream -> {
+            Map<String, StreamSource> sources = inlongStream.getSources();
+            if (MapUtils.isNotEmpty(sources)) {
+                for (Map.Entry<String, StreamSource> entry : sources.entrySet()) {
+                    StreamSource source = entry.getValue();
+                    if (source.getState() == State.FAILED) {
+                        failedSources.add(source);
+                    }
+                }
+            }
+        });
+        if (CollectionUtils.isNotEmpty(failedSources)) {
+            this.state = InlongGroupState.FAILED;
+            for (StreamSource failedSource : failedSources) {
+                this.groupErrLogs.computeIfAbsent("failedSources", Lists::newArrayList)
+                        .add(GsonUtil.toJson(failedSource));
+            }
+        }
+    }
+
     public enum InlongGroupState {
         CREATE, REJECTED, INITIALIZING, OPERATING, STARTED, FAILED, STOPPED, FINISHED, DELETED;
 
         // Reference to  org.apache.inlong.manager.common.enums.GroupState code
         public static InlongGroupState parseByBizStatus(int bizCode) {
 
-            switch (bizCode) {
-                case 0:
-                case 100:
+            GroupState groupState = GroupState.forCode(bizCode);
+
+            switch (groupState) {
+                case DRAFT:
+                case TO_BE_SUBMIT:
                     return CREATE;
-                case 41:
-                case 141:
-                case 151:
+                case DELETING:
+                case SUSPENDING:
+                case RESTARTING:
                     return OPERATING;
-                case 102:
+                case APPROVE_REJECTED:
                     return REJECTED;
-                case 101:
-                case 103:
-                case 110:
+                case TO_BE_APPROVAL:
+                case APPROVE_PASSED:
+                case CONFIG_ING:
                     return INITIALIZING;
-                case 120:
+                case CONFIG_FAILED:
                     return FAILED;
-                case 130:
-                case 150:
+                case CONFIG_SUCCESSFUL:
+                case RESTARTED:
                     return STARTED;
-                case 140:
+                case SUSPENDED:
                     return STOPPED;
-                case 131:
+                case FINISH:
                     return FINISHED;
-                case 40:
+                case DELETED:
                     return DELETED;
                 default:
                     throw new IllegalArgumentException(String.format("Unsupported status %s for group", bizCode));
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSource.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSource.java
index 354d674..dca1183 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSource.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSource.java
@@ -20,12 +20,46 @@ package org.apache.inlong.manager.client.api;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
+import org.apache.inlong.manager.common.enums.SourceState;
 import org.apache.inlong.manager.common.enums.SourceType;
 
 @Data
 @ApiModel("Stream source configuration")
 public abstract class StreamSource {
 
+    public enum State {
+        INIT, NORMAL, FROZING, FROZEN, FAILED, DELETING, DELETE;
+
+        public static State parseByStatus(int status) {
+            SourceState sourceState = SourceState.forCode(status);
+            switch (sourceState) {
+                case SOURCE_NEW:
+                case TO_BE_ISSUED_ADD:
+                case BEEN_ISSUED_ADD:
+                case TO_BE_ISSUED_ACTIVE:
+                case BEEN_ISSUED_ACTIVE:
+                    return INIT;
+                case SOURCE_NORMAL:
+                    return NORMAL;
+                case TO_BE_ISSUED_FROZEN:
+                case BEEN_ISSUED_FROZEN:
+                    return FROZING;
+                case SOURCE_FROZEN:
+                    return FROZEN;
+                case SOURCE_FAILED:
+                    return FAILED;
+                case TO_BE_ISSUED_DELETE:
+                case BEEN_ISSUED_DELETE:
+                    return DELETING;
+                case SOURCE_DISABLE:
+                    return DELETE;
+                default:
+                    throw new IllegalStateException(
+                            String.format("Unsupported source state=%s for Inlong", sourceState));
+            }
+        }
+    }
+
     public enum SyncType {
         FULL, INCREMENT
     }
@@ -39,10 +73,12 @@ public abstract class StreamSource {
     @ApiModelProperty("Mac uuid of the agent running the task")
     private String uuid = "";
 
+    @ApiModelProperty("State of stream source")
+    private State state;
+
     public abstract SourceType getSourceType();
 
     public abstract SyncType getSyncType();
 
     public abstract DataFormat getDataFormat();
-
 }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
index 413ba45..43c7ec4 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
@@ -237,7 +237,7 @@ public class InlongGroupImpl implements InlongGroup {
                             groupLogs.computeIfAbsent(taskName, Lists::newArrayList).add(eventLogView.getRemark());
                         }
                     });
-            inlongGroupContext.setErrMsgs(errMsgs);
+            inlongGroupContext.setGroupErrLogs(errMsgs);
             inlongGroupContext.setGroupLogs(groupLogs);
         }
         //Fetch stream logs
@@ -254,7 +254,7 @@ public class InlongGroupImpl implements InlongGroup {
                             String log = GsonUtil.toJson(streamLog);
                             streamLogs.computeIfAbsent(componentName, Lists::newArrayList).add(log);
                         });
-                inlongGroupContext.getStreamLogs().put(streamName, streamLogs);
+                inlongGroupContext.getStreamErrLogs().put(streamName, streamLogs);
             }
         });
         return inlongGroupContext;
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
index 0294a79..92a8e7a 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
@@ -23,6 +23,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.client.api.DataFormat;
 import org.apache.inlong.manager.client.api.KafkaOffset;
 import org.apache.inlong.manager.client.api.StreamSource;
+import org.apache.inlong.manager.client.api.StreamSource.State;
 import org.apache.inlong.manager.client.api.StreamSource.SyncType;
 import org.apache.inlong.manager.client.api.auth.DefaultAuthentication;
 import org.apache.inlong.manager.client.api.source.KafkaSource;
@@ -88,6 +89,7 @@ public class InlongStreamSourceTransfer {
         kafkaSource.setConsumerGroup(kafkaSourceResponse.getGroupId());
         DataFormat dataFormat = DataFormat.forName(kafkaSourceResponse.getSerializationType());
         kafkaSource.setDataFormat(dataFormat);
+        kafkaSource.setState(State.parseByStatus(kafkaSourceResponse.getStatus()));
         kafkaSource.setAgentIp(kafkaSourceResponse.getAgentIp());
         kafkaSource.setTopic(kafkaSourceResponse.getTopic());
         kafkaSource.setBootstrapServers(kafkaSourceResponse.getBootstrapServers());
@@ -106,7 +108,7 @@ public class InlongStreamSourceTransfer {
         KafkaSource kafkaSource = new KafkaSource();
         kafkaSource.setSourceName(kafkaResponse.getSourceName());
         kafkaSource.setConsumerGroup(kafkaResponse.getGroupId());
-
+        kafkaSource.setState(State.parseByStatus(kafkaResponse.getStatus()));
         DataFormat dataFormat = DataFormat.forName(kafkaResponse.getSerializationType());
         kafkaSource.setDataFormat(dataFormat);
         kafkaSource.setTopic(kafkaResponse.getTopic());
@@ -128,6 +130,7 @@ public class InlongStreamSourceTransfer {
         binlogSource.setDataFormat(DataFormat.NONE);
         binlogSource.setPort(response.getPort());
         binlogSource.setAgentIp(response.getAgentIp());
+        binlogSource.setState(State.parseByStatus(response.getStatus()));
         DefaultAuthentication defaultAuthentication = new DefaultAuthentication(
                 response.getUser(),
                 response.getPassword());
@@ -153,6 +156,7 @@ public class InlongStreamSourceTransfer {
         binlogSource.setHostname(response.getHostname());
         binlogSource.setDataFormat(DataFormat.NONE);
         binlogSource.setPort(response.getPort());
+        binlogSource.setState(State.parseByStatus(response.getStatus()));
         DefaultAuthentication defaultAuthentication = new DefaultAuthentication(
                 response.getUser(),
                 response.getPassword());