You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/03/24 14:51:53 UTC

[GitHub] [incubator-inlong] healchow commented on a change in pull request #3356: [INLONG-3355][Manager] Check sourceState When return GroupState in Manager Client

healchow commented on a change in pull request #3356:
URL: https://github.com/apache/incubator-inlong/pull/3356#discussion_r834376564



##########
File path: inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamResponse.java
##########
@@ -85,29 +84,6 @@
     @ApiModelProperty(value = "Names of responsible persons, separated by commas")
     private String inCharges;
 
-    @ApiModelProperty(value = "Status")

Review comment:
       The response needs to show those fields for the user.

##########
File path: inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java
##########
@@ -91,24 +93,53 @@ private void recheckState() {
         if (MapUtils.isEmpty(this.inlongStreamMap)) {
             return;
         }
+        List<StreamSource> sourcesInGroup = Lists.newArrayList();
         List<StreamSource> failedSources = Lists.newArrayList();
         this.inlongStreamMap.values().stream().forEach(inlongStream -> {
             Map<String, StreamSource> sources = inlongStream.getSources();
             if (MapUtils.isNotEmpty(sources)) {
                 for (Map.Entry<String, StreamSource> entry : sources.entrySet()) {
                     StreamSource source = entry.getValue();
-                    if (source.getState() == State.FAILED) {
-                        failedSources.add(source);
+                    if (source != null) {
+                        sourcesInGroup.add(source);
+                        if (source.getState() == State.FAILED) {
+                            failedSources.add(source);
+                        }
                     }
                 }
             }
         });
+        // check if any stream source is failed
         if (CollectionUtils.isNotEmpty(failedSources)) {
             this.state = InlongGroupState.FAILED;
             for (StreamSource failedSource : failedSources) {
                 this.groupErrLogs.computeIfAbsent("failedSources", Lists::newArrayList)
                         .add(GsonUtil.toJson(failedSource));
             }
+            return;
+        }
+        // check if any stream source is in indirect state
+        switch (this.state) {
+            case STARTED:
+                for (StreamSource source : sourcesInGroup) {
+                    if (!(source.getState() == State.NORMAL)) {

Review comment:
       Why not use `source.getState() != State.NORMAL`?

##########
File path: inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java
##########
@@ -126,11 +126,6 @@ public void updateOpt(SourceRequest request, Integer groupStatus, String operato
         // Setting updated parameters of stream source entity.
         setTargetEntity(request, entity);
         entity.setVersion(entity.getVersion() + 1);
-        if (GroupState.forCode(groupStatus).equals(GroupState.CONFIG_SUCCESSFUL)) {

Review comment:
       Why remove those lines?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org