You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/11/10 02:23:56 UTC

[GitHub] [inlong] vernedeng commented on a diff in pull request #6046: [INLONG-6044][Manager] Distinguish between group and stream configuration processes

vernedeng commented on code in PR #6046:
URL: https://github.com/apache/inlong/pull/6046#discussion_r1018587699


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java:
##########
@@ -67,49 +62,12 @@ public boolean accept(String mqType) {
 
     @Override
     public void createQueueForGroup(@NotNull InlongGroupInfo groupInfo, @NotBlank String operator) {
-        String groupId = groupInfo.getInlongGroupId();
-        log.info("begin to create kafka resource for groupId={}", groupId);
-
-        InlongKafkaInfo inlongKafkaInfo = (InlongKafkaInfo) groupInfo;
-        try {
-            // 1. create kafka Topic - each Inlong Stream corresponds to a Kafka Topic
-            List<InlongStreamBriefInfo> streamInfoList = streamService.getTopicList(groupId);
-            if (streamInfoList == null || streamInfoList.isEmpty()) {
-                log.warn("skip to create kafka topic and subscription as no streams for groupId={}", groupId);
-                return;
-            }
-            for (InlongStreamBriefInfo streamInfo : streamInfoList) {
-                this.createKafkaTopic(inlongKafkaInfo, streamInfo.getMqResource());
-            }
-        } catch (Exception e) {
-            String msg = String.format("failed to create kafka resource for groupId=%s", groupId);
-            log.error(msg, e);
-            throw new WorkflowListenerException(msg + ": " + e.getMessage());
-        }
-        log.info("success to create kafka resource for groupId={}, cluster={}", groupId, inlongKafkaInfo);
+        log.info("skip to create kafka topic for groupId={}", groupInfo.getInlongGroupId());
     }
 
     @Override
     public void deleteQueueForGroup(InlongGroupInfo groupInfo, String operator) {
-        Preconditions.checkNotNull(groupInfo, "inlong group info cannot be null");
-
-        String groupId = groupInfo.getInlongGroupId();
-        log.info("begin to delete kafka resource for groupId={}", groupId);
-        ClusterInfo clusterInfo = clusterService.getOne(groupInfo.getInlongClusterTag(), null, ClusterType.KAFKA);
-        try {
-            List<InlongStreamBriefInfo> streamInfoList = streamService.getTopicList(groupId);
-            if (streamInfoList == null || streamInfoList.isEmpty()) {
-                log.warn("skip to create kafka topic and subscription as no streams for groupId={}", groupId);
-                return;
-            }
-            for (InlongStreamBriefInfo streamInfo : streamInfoList) {
-                this.deleteKafkaTopic(groupInfo, streamInfo.getInlongStreamId());
-            }
-        } catch (Exception e) {
-            log.error("failed to delete kafka resource for groupId=" + groupId, e);
-            throw new WorkflowListenerException("failed to delete kafka resource: " + e.getMessage());
-        }
-        log.info("success to delete kafka resource for groupId={}, cluster={}", groupId, clusterInfo);
+        log.info("skip to delete kafka topic for groupId={}", groupInfo.getInlongGroupId());

Review Comment:
   why skip delete kafka topic directly?



##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java:
##########
@@ -81,6 +82,10 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc
         }
         InlongGroupInfo groupInfo = form.getGroupInfo();
         List<InlongStreamInfo> streamInfos = form.getStreamInfos();
+        if (CollectionUtils.isEmpty(streamInfos)) {
+            LOGGER.warn("not build sort config for groupId={}, as the stream is empty", groupId);

Review Comment:
   
   ```suggestion
               LOGGER.warn("do not build sort config for groupId={}, as the stream is empty", groupId);
   ```



##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java:
##########
@@ -96,4 +128,45 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc
         return ListenerResult.success("success");
     }
 
+    private void createQueueForStreams(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfos, String operator) {
+        String groupId = groupInfo.getInlongGroupId();
+        log.info("success to start stream process for groupId={}", groupId);
+
+        for (InlongStreamInfo stream : streamInfos) {
+            StreamResourceProcessForm form = StreamResourceProcessForm.getProcessForm(groupInfo, stream, INIT);
+            String streamId = stream.getInlongStreamId();
+            final String errMsg = "failed to start stream process for groupId=" + groupId + " streamId=" + streamId;
+
+            CompletableFuture<WorkflowResult> future = CompletableFuture
+                    .supplyAsync(() -> workflowService.start(CREATE_STREAM_RESOURCE, operator, form), EXECUTOR_SERVICE)
+                    .whenComplete((result, ex) -> {
+                        if (ex != null) {
+                            log.error(errMsg + ": " + ex.getMessage());
+                            throw new WorkflowListenerException(errMsg, ex);
+                        } else {
+                            List<TaskResponse> tasks = result.getNewTasks();
+                            if (TaskStatus.FAILED == tasks.get(tasks.size() - 1).getStatus()) {
+                                log.error(errMsg);
+                                throw new WorkflowListenerException(errMsg);
+                            }
+                        }
+                    });
+            try {
+                future.get(180, TimeUnit.SECONDS);
+                /*WorkflowResult result = future.get(180, TimeUnit.SECONDS);
+                List<TaskResponse> tasks = result.getNewTasks();
+                if (TaskStatus.FAILED == tasks.get(tasks.size() - 1).getStatus()) {
+                    log.error(errMsg);
+                    throw new WorkflowListenerException(errMsg);
+                }*/

Review Comment:
   plsase remove these commented code



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org