You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/07/11 06:40:47 UTC

[GitHub] [inlong] healchow opened a new pull request, #4954: [INLONG-4949][Manager] Support to extend different types of Sort protocol

healchow opened a new pull request, #4954:
URL: https://github.com/apache/inlong/pull/4954

   ### Prepare a Pull Request
   
   - Fixes #4949
   
   ### Motivation
   
   Support to extend different types of Sort protocol.
   
   ### Modifications
   
   *Describe the modifications you've done.*
   
   ### Verifying this change
   
   *(Please pick either of the following options)*
   
   - [X] This change is a trivial rework/code cleanup without any test coverage.
   
   ### Documentation
   
     - Does this pull request introduce a new feature? (yes)
     - If yes, how is the feature documented? (JavaDocs)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] vernedeng commented on a diff in pull request #4954: [INLONG-4949][Manager] Support to extend different types of Sort protocol

Posted by GitBox <gi...@apache.org>.
vernedeng commented on code in PR #4954:
URL: https://github.com/apache/inlong/pull/4954#discussion_r917610263


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/SortConfig4NormalGroupOperator.java:
##########
@@ -77,92 +76,86 @@ public class CreateSortConfigListenerV2 implements SortOperateListener {
     private InlongClusterService clusterService;
 
     @Override
-    public TaskEvent event() {
-        return TaskEvent.COMPLETE;
+    public Boolean accept(Integer isNormal, Integer enableZk) {
+        return InlongConstants.NORMAL_MODE.equals(isNormal)
+                && InlongConstants.DISABLE_ZK.equals(enableZk);
     }
 
-    @SneakyThrows
     @Override
-    public ListenerResult listen(WorkflowContext context) {
-        log.info("Create sort config V2 for groupId={}", context.getProcessForm().getInlongGroupId());
-        GroupResourceProcessForm form = (GroupResourceProcessForm) context.getProcessForm();
-        GroupOperateType groupOperateType = form.getGroupOperateType();
-        if (groupOperateType == GroupOperateType.SUSPEND || groupOperateType == GroupOperateType.DELETE) {
-            return ListenerResult.success();
-        }
-        InlongGroupInfo groupInfo = form.getGroupInfo();
-        List<InlongStreamInfo> streamInfos = form.getStreamInfos();
-        int sinkCount = streamInfos.stream()
-                .map(s -> s.getSinkList() == null ? 0 : s.getSinkList().size())
-                .reduce(0, Integer::sum);
-        if (sinkCount == 0) {
-            log.warn("not any sink for group {} found, skip creating sort config", groupInfo.getInlongGroupId());
-            return ListenerResult.success();
+    public void buildConfig(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfos, boolean isStream)
+            throws Exception {
+        if (groupInfo == null || CollectionUtils.isEmpty(streamInfos)) {
+            LOGGER.warn("group info is null or stream infos is empty, no need to build sort config for disable zk");

Review Comment:
   stream infos **_are_** empty



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] dockerzhang merged pull request #4954: [INLONG-4949][Manager] Support to extend different types of Sort protocol

Posted by GitBox <gi...@apache.org>.
dockerzhang merged PR #4954:
URL: https://github.com/apache/inlong/pull/4954


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow commented on a diff in pull request #4954: [INLONG-4949][Manager] Support to extend different types of Sort protocol

Posted by GitBox <gi...@apache.org>.
healchow commented on code in PR #4954:
URL: https://github.com/apache/inlong/pull/4954#discussion_r917621975


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/SortConfig4NormalGroupOperator.java:
##########
@@ -77,92 +76,86 @@ public class CreateSortConfigListenerV2 implements SortOperateListener {
     private InlongClusterService clusterService;
 
     @Override
-    public TaskEvent event() {
-        return TaskEvent.COMPLETE;
+    public Boolean accept(Integer isNormal, Integer enableZk) {
+        return InlongConstants.NORMAL_MODE.equals(isNormal)
+                && InlongConstants.DISABLE_ZK.equals(enableZk);
     }
 
-    @SneakyThrows
     @Override
-    public ListenerResult listen(WorkflowContext context) {
-        log.info("Create sort config V2 for groupId={}", context.getProcessForm().getInlongGroupId());
-        GroupResourceProcessForm form = (GroupResourceProcessForm) context.getProcessForm();
-        GroupOperateType groupOperateType = form.getGroupOperateType();
-        if (groupOperateType == GroupOperateType.SUSPEND || groupOperateType == GroupOperateType.DELETE) {
-            return ListenerResult.success();
-        }
-        InlongGroupInfo groupInfo = form.getGroupInfo();
-        List<InlongStreamInfo> streamInfos = form.getStreamInfos();
-        int sinkCount = streamInfos.stream()
-                .map(s -> s.getSinkList() == null ? 0 : s.getSinkList().size())
-                .reduce(0, Integer::sum);
-        if (sinkCount == 0) {
-            log.warn("not any sink for group {} found, skip creating sort config", groupInfo.getInlongGroupId());
-            return ListenerResult.success();
+    public void buildConfig(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfos, boolean isStream)
+            throws Exception {
+        if (groupInfo == null || CollectionUtils.isEmpty(streamInfos)) {
+            LOGGER.warn("group info is null or stream infos is empty, no need to build sort config for disable zk");

Review Comment:
   Okkk.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow commented on a diff in pull request #4954: [INLONG-4949][Manager] Support to extend different types of Sort protocol

Posted by GitBox <gi...@apache.org>.
healchow commented on code in PR #4954:
URL: https://github.com/apache/inlong/pull/4954#discussion_r917621975


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/SortConfig4NormalGroupOperator.java:
##########
@@ -77,92 +76,86 @@ public class CreateSortConfigListenerV2 implements SortOperateListener {
     private InlongClusterService clusterService;
 
     @Override
-    public TaskEvent event() {
-        return TaskEvent.COMPLETE;
+    public Boolean accept(Integer isNormal, Integer enableZk) {
+        return InlongConstants.NORMAL_MODE.equals(isNormal)
+                && InlongConstants.DISABLE_ZK.equals(enableZk);
     }
 
-    @SneakyThrows
     @Override
-    public ListenerResult listen(WorkflowContext context) {
-        log.info("Create sort config V2 for groupId={}", context.getProcessForm().getInlongGroupId());
-        GroupResourceProcessForm form = (GroupResourceProcessForm) context.getProcessForm();
-        GroupOperateType groupOperateType = form.getGroupOperateType();
-        if (groupOperateType == GroupOperateType.SUSPEND || groupOperateType == GroupOperateType.DELETE) {
-            return ListenerResult.success();
-        }
-        InlongGroupInfo groupInfo = form.getGroupInfo();
-        List<InlongStreamInfo> streamInfos = form.getStreamInfos();
-        int sinkCount = streamInfos.stream()
-                .map(s -> s.getSinkList() == null ? 0 : s.getSinkList().size())
-                .reduce(0, Integer::sum);
-        if (sinkCount == 0) {
-            log.warn("not any sink for group {} found, skip creating sort config", groupInfo.getInlongGroupId());
-            return ListenerResult.success();
+    public void buildConfig(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfos, boolean isStream)
+            throws Exception {
+        if (groupInfo == null || CollectionUtils.isEmpty(streamInfos)) {
+            LOGGER.warn("group info is null or stream infos is empty, no need to build sort config for disable zk");

Review Comment:
   It's a list, keep same with `CollectionUtils.isEmpty()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org