You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/17 13:52:53 UTC
[incubator-inlong] branch master updated: [INLONG-3192][Manager] Add source state collect in Manager Client (#3194)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 01cc4f0 [INLONG-3192][Manager] Add source state collect in Manager Client (#3194)
01cc4f0 is described below
commit 01cc4f03a8e274a9f728ba7749a28ed6253851e7
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Thu Mar 17 21:52:43 2022 +0800
[INLONG-3192][Manager] Add source state collect in Manager Client (#3194)
---
.../manager/client/api/InlongGroupContext.java | 71 ++++++++++++++++------
.../inlong/manager/client/api/StreamSource.java | 38 +++++++++++-
.../manager/client/api/impl/InlongGroupImpl.java | 4 +-
.../api/util/InlongStreamSourceTransfer.java | 6 +-
4 files changed, 96 insertions(+), 23 deletions(-)
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java
index 68789e1..92aaf76 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java
@@ -17,11 +17,16 @@
package org.apache.inlong.manager.client.api;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.Data;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.inlong.manager.client.api.StreamSource.State;
import org.apache.inlong.manager.client.api.inner.InnerGroupContext;
import org.apache.inlong.manager.client.api.util.AssertUtil;
+import org.apache.inlong.manager.client.api.util.GsonUtil;
+import org.apache.inlong.manager.common.enums.GroupState;
import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
@@ -53,12 +58,12 @@ public class InlongGroupContext implements Serializable {
/**
* Error message for Inlong group, taskName->exceptionMsg.
*/
- private Map<String, List<String>> errMsgs;
+ private Map<String, List<String>> groupErrLogs;
/**
* Logs for each stream, key: streamName, value: componentName->log
*/
- private Map<String, Map<String, List<String>>> streamLogs = Maps.newHashMap();
+ private Map<String, Map<String, List<String>>> streamErrLogs = Maps.newHashMap();
private InlongGroupState state;
@@ -69,9 +74,10 @@ public class InlongGroupContext implements Serializable {
this.groupName = groupInfo.getName();
this.groupConf = streamGroupConf;
this.inlongStreamMap = groupContext.getStreamMap();
- this.errMsgs = Maps.newHashMap();
+ this.groupErrLogs = Maps.newHashMap();
this.groupLogs = Maps.newHashMap();
this.state = InlongGroupState.parseByBizStatus(groupInfo.getStatus());
+ recheckState();
this.extensions = Maps.newHashMap();
List<InlongGroupExtInfo> extInfos = groupInfo.getExtList();
if (CollectionUtils.isNotEmpty(extInfos)) {
@@ -81,36 +87,63 @@ public class InlongGroupContext implements Serializable {
}
}
+ private void recheckState() {
+ if (MapUtils.isEmpty(this.inlongStreamMap)) {
+ return;
+ }
+ List<StreamSource> failedSources = Lists.newArrayList();
+ this.inlongStreamMap.values().stream().forEach(inlongStream -> {
+ Map<String, StreamSource> sources = inlongStream.getSources();
+ if (MapUtils.isNotEmpty(sources)) {
+ for (Map.Entry<String, StreamSource> entry : sources.entrySet()) {
+ StreamSource source = entry.getValue();
+ if (source.getState() == State.FAILED) {
+ failedSources.add(source);
+ }
+ }
+ }
+ });
+ if (CollectionUtils.isNotEmpty(failedSources)) {
+ this.state = InlongGroupState.FAILED;
+ for (StreamSource failedSource : failedSources) {
+ this.groupErrLogs.computeIfAbsent("failedSources", Lists::newArrayList)
+ .add(GsonUtil.toJson(failedSource));
+ }
+ }
+ }
+
public enum InlongGroupState {
CREATE, REJECTED, INITIALIZING, OPERATING, STARTED, FAILED, STOPPED, FINISHED, DELETED;
// Reference to org.apache.inlong.manager.common.enums.GroupState code
public static InlongGroupState parseByBizStatus(int bizCode) {
- switch (bizCode) {
- case 0:
- case 100:
+ GroupState groupState = GroupState.forCode(bizCode);
+
+ switch (groupState) {
+ case DRAFT:
+ case TO_BE_SUBMIT:
return CREATE;
- case 41:
- case 141:
- case 151:
+ case DELETING:
+ case SUSPENDING:
+ case RESTARTING:
return OPERATING;
- case 102:
+ case APPROVE_REJECTED:
return REJECTED;
- case 101:
- case 103:
- case 110:
+ case TO_BE_APPROVAL:
+ case APPROVE_PASSED:
+ case CONFIG_ING:
return INITIALIZING;
- case 120:
+ case CONFIG_FAILED:
return FAILED;
- case 130:
- case 150:
+ case CONFIG_SUCCESSFUL:
+ case RESTARTED:
return STARTED;
- case 140:
+ case SUSPENDED:
return STOPPED;
- case 131:
+ case FINISH:
return FINISHED;
- case 40:
+ case DELETED:
return DELETED;
default:
throw new IllegalArgumentException(String.format("Unsupported status %s for group", bizCode));
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSource.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSource.java
index 354d674..dca1183 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSource.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSource.java
@@ -20,12 +20,46 @@ package org.apache.inlong.manager.client.api;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
+import org.apache.inlong.manager.common.enums.SourceState;
import org.apache.inlong.manager.common.enums.SourceType;
@Data
@ApiModel("Stream source configuration")
public abstract class StreamSource {
+ public enum State {
+ INIT, NORMAL, FROZING, FROZEN, FAILED, DELETING, DELETE;
+
+ public static State parseByStatus(int status) {
+ SourceState sourceState = SourceState.forCode(status);
+ switch (sourceState) {
+ case SOURCE_NEW:
+ case TO_BE_ISSUED_ADD:
+ case BEEN_ISSUED_ADD:
+ case TO_BE_ISSUED_ACTIVE:
+ case BEEN_ISSUED_ACTIVE:
+ return INIT;
+ case SOURCE_NORMAL:
+ return NORMAL;
+ case TO_BE_ISSUED_FROZEN:
+ case BEEN_ISSUED_FROZEN:
+ return FROZING;
+ case SOURCE_FROZEN:
+ return FROZEN;
+ case SOURCE_FAILED:
+ return FAILED;
+ case TO_BE_ISSUED_DELETE:
+ case BEEN_ISSUED_DELETE:
+ return DELETING;
+ case SOURCE_DISABLE:
+ return DELETE;
+ default:
+ throw new IllegalStateException(
+ String.format("Unsupported source state=%s for Inlong", sourceState));
+ }
+ }
+ }
+
public enum SyncType {
FULL, INCREMENT
}
@@ -39,10 +73,12 @@ public abstract class StreamSource {
@ApiModelProperty("Mac uuid of the agent running the task")
private String uuid = "";
+ @ApiModelProperty("State of stream source")
+ private State state;
+
public abstract SourceType getSourceType();
public abstract SyncType getSyncType();
public abstract DataFormat getDataFormat();
-
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
index 413ba45..43c7ec4 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
@@ -237,7 +237,7 @@ public class InlongGroupImpl implements InlongGroup {
groupLogs.computeIfAbsent(taskName, Lists::newArrayList).add(eventLogView.getRemark());
}
});
- inlongGroupContext.setErrMsgs(errMsgs);
+ inlongGroupContext.setGroupErrLogs(errMsgs);
inlongGroupContext.setGroupLogs(groupLogs);
}
//Fetch stream logs
@@ -254,7 +254,7 @@ public class InlongGroupImpl implements InlongGroup {
String log = GsonUtil.toJson(streamLog);
streamLogs.computeIfAbsent(componentName, Lists::newArrayList).add(log);
});
- inlongGroupContext.getStreamLogs().put(streamName, streamLogs);
+ inlongGroupContext.getStreamErrLogs().put(streamName, streamLogs);
}
});
return inlongGroupContext;
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
index 0294a79..92a8e7a 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
@@ -23,6 +23,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.client.api.DataFormat;
import org.apache.inlong.manager.client.api.KafkaOffset;
import org.apache.inlong.manager.client.api.StreamSource;
+import org.apache.inlong.manager.client.api.StreamSource.State;
import org.apache.inlong.manager.client.api.StreamSource.SyncType;
import org.apache.inlong.manager.client.api.auth.DefaultAuthentication;
import org.apache.inlong.manager.client.api.source.KafkaSource;
@@ -88,6 +89,7 @@ public class InlongStreamSourceTransfer {
kafkaSource.setConsumerGroup(kafkaSourceResponse.getGroupId());
DataFormat dataFormat = DataFormat.forName(kafkaSourceResponse.getSerializationType());
kafkaSource.setDataFormat(dataFormat);
+ kafkaSource.setState(State.parseByStatus(kafkaSourceResponse.getStatus()));
kafkaSource.setAgentIp(kafkaSourceResponse.getAgentIp());
kafkaSource.setTopic(kafkaSourceResponse.getTopic());
kafkaSource.setBootstrapServers(kafkaSourceResponse.getBootstrapServers());
@@ -106,7 +108,7 @@ public class InlongStreamSourceTransfer {
KafkaSource kafkaSource = new KafkaSource();
kafkaSource.setSourceName(kafkaResponse.getSourceName());
kafkaSource.setConsumerGroup(kafkaResponse.getGroupId());
-
+ kafkaSource.setState(State.parseByStatus(kafkaResponse.getStatus()));
DataFormat dataFormat = DataFormat.forName(kafkaResponse.getSerializationType());
kafkaSource.setDataFormat(dataFormat);
kafkaSource.setTopic(kafkaResponse.getTopic());
@@ -128,6 +130,7 @@ public class InlongStreamSourceTransfer {
binlogSource.setDataFormat(DataFormat.NONE);
binlogSource.setPort(response.getPort());
binlogSource.setAgentIp(response.getAgentIp());
+ binlogSource.setState(State.parseByStatus(response.getStatus()));
DefaultAuthentication defaultAuthentication = new DefaultAuthentication(
response.getUser(),
response.getPassword());
@@ -153,6 +156,7 @@ public class InlongStreamSourceTransfer {
binlogSource.setHostname(response.getHostname());
binlogSource.setDataFormat(DataFormat.NONE);
binlogSource.setPort(response.getPort());
+ binlogSource.setState(State.parseByStatus(response.getStatus()));
DefaultAuthentication defaultAuthentication = new DefaultAuthentication(
response.getUser(),
response.getPassword());