You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/10/26 09:30:14 UTC

[GitHub] [inlong] healchow opened a new pull request, #6299: [INLONG-6297][Manager] Refactor the PollerPlugin and SortPoller interfaces

healchow opened a new pull request, #6299:
URL: https://github.com/apache/inlong/pull/6299

   ### Prepare a Pull Request
   
   - Fixes #6297
   
   ### Motivation
   
   Refactor the PollerPlugin and SortPoller interfaces.
   
   ### Modifications
   
   1. Refactor the Plugin define classes to manager-common.
   2. Change the result type of listSrtStatus APIs to support extension.
   
   ### Verifying this change
   
   - [x] This change is a trivial rework/code cleanup without any test coverage.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] woofyzhao commented on a diff in pull request #6299: [INLONG-6297][Manager] Refactor the PollerPlugin and SortPoller interfaces

Posted by GitBox <gi...@apache.org>.
woofyzhao commented on code in PR #6299:
URL: https://github.com/apache/inlong/pull/6299#discussion_r1006474691


##########
inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java:
##########
@@ -294,17 +294,25 @@ private InlongGroupContext generateSnapshot() {
     private InlongGroupContext generateSnapshot(String credentials) {
         InlongGroupContext groupContext = generateSnapshot();
         InlongGroupInfo groupInfo = groupContext.getGroupInfo();
-        if (groupInfo.getExtList().stream().anyMatch(info -> InlongConstants.SORT_JOB_ID.equals(info.getKeyName())
-                && StringUtils.isNotEmpty(info.getKeyValue()))) {
-            ListSortStatusRequest request = new ListSortStatusRequest();
+        if (groupInfo.getExtList().stream().anyMatch(ext -> InlongConstants.SORT_JOB_ID.equals(ext.getKeyName())
+                && StringUtils.isNotEmpty(ext.getKeyValue()))) {
+            SortStatusRequest request = new SortStatusRequest();
             request.setInlongGroupIds(Collections.singletonList(groupInfo.getInlongGroupId()));
             request.setCredentials(credentials);
-            ListSortStatusResponse sortStatusInfo = groupClient.listSortStatus(request);
-            if (MapUtils.isNotEmpty(sortStatusInfo.getStatusMap())) {
-                groupContext.updateSortStatus(sortStatusInfo.getStatusMap().getOrDefault(
-                        groupInfo.getInlongGroupId(), SortStatus.UNKNOWN));
+            List<SortStatusInfo> statusInfos = groupClient.listSortStatus(request);
+
+            SortStatus sortStatus = SortStatus.UNKNOWN;
+            if (CollectionUtils.isNotEmpty(statusInfos)) {
+                Optional<SortStatusInfo> optional = statusInfos.stream()
+                        .filter(status -> groupInfo.getInlongGroupId().equals(status.getInlongGroupId()))
+                        .findFirst();
+                if (optional.isPresent()) {
+                    sortStatus = optional.get().getSortStatus();
+                }
             }
+            groupContext.updateSortStatus(sortStatus);
         }
+

Review Comment:
   Suggest updating status to NOT_EXISTS rather than the default UNKNOWN status at the else branch  of the If statement.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow merged pull request #6299: [INLONG-6297][Manager] Refactor the PollerPlugin and SortPoller interfaces

Posted by GitBox <gi...@apache.org>.
healchow merged PR #6299:
URL: https://github.com/apache/inlong/pull/6299


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow commented on a diff in pull request #6299: [INLONG-6297][Manager] Refactor the PollerPlugin and SortPoller interfaces

Posted by GitBox <gi...@apache.org>.
healchow commented on code in PR #6299:
URL: https://github.com/apache/inlong/pull/6299#discussion_r1006496423


##########
inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java:
##########
@@ -294,17 +294,25 @@ private InlongGroupContext generateSnapshot() {
     private InlongGroupContext generateSnapshot(String credentials) {
         InlongGroupContext groupContext = generateSnapshot();
         InlongGroupInfo groupInfo = groupContext.getGroupInfo();
-        if (groupInfo.getExtList().stream().anyMatch(info -> InlongConstants.SORT_JOB_ID.equals(info.getKeyName())
-                && StringUtils.isNotEmpty(info.getKeyValue()))) {
-            ListSortStatusRequest request = new ListSortStatusRequest();
+        if (groupInfo.getExtList().stream().anyMatch(ext -> InlongConstants.SORT_JOB_ID.equals(ext.getKeyName())
+                && StringUtils.isNotEmpty(ext.getKeyValue()))) {
+            SortStatusRequest request = new SortStatusRequest();
             request.setInlongGroupIds(Collections.singletonList(groupInfo.getInlongGroupId()));
             request.setCredentials(credentials);
-            ListSortStatusResponse sortStatusInfo = groupClient.listSortStatus(request);
-            if (MapUtils.isNotEmpty(sortStatusInfo.getStatusMap())) {
-                groupContext.updateSortStatus(sortStatusInfo.getStatusMap().getOrDefault(
-                        groupInfo.getInlongGroupId(), SortStatus.UNKNOWN));
+            List<SortStatusInfo> statusInfos = groupClient.listSortStatus(request);
+
+            SortStatus sortStatus = SortStatus.UNKNOWN;
+            if (CollectionUtils.isNotEmpty(statusInfos)) {
+                Optional<SortStatusInfo> optional = statusInfos.stream()
+                        .filter(status -> groupInfo.getInlongGroupId().equals(status.getInlongGroupId()))
+                        .findFirst();
+                if (optional.isPresent()) {
+                    sortStatus = optional.get().getSortStatus();
+                }
             }
+            groupContext.updateSortStatus(sortStatus);
         }
+

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] woofyzhao commented on a diff in pull request #6299: [INLONG-6297][Manager] Refactor the PollerPlugin and SortPoller interfaces

Posted by GitBox <gi...@apache.org>.
woofyzhao commented on code in PR #6299:
URL: https://github.com/apache/inlong/pull/6299#discussion_r1005606256


##########
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/FlinkSortPollerPlugin.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.plugin;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.plugin.poller.SortStatusPoller;
+import org.apache.inlong.manager.workflow.plugin.sort.PollerPlugin;
+import org.apache.inlong.manager.workflow.plugin.sort.SortPoller;
+
+/**
+ * Plugin of Flink Sort poller.
+ */
+@Slf4j
+public class FlinkSortPollerPlugin implements PollerPlugin {

Review Comment:
   The plugin loader must now support multiple plugin classes in a single jar now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] woofyzhao commented on a diff in pull request #6299: [INLONG-6297][Manager] Refactor the PollerPlugin and SortPoller interfaces

Posted by GitBox <gi...@apache.org>.
woofyzhao commented on code in PR #6299:
URL: https://github.com/apache/inlong/pull/6299#discussion_r1005547957


##########
inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java:
##########
@@ -152,20 +153,22 @@ public Map<String, InlongGroupStatusInfo> listGroupStatus(List<String> groupIds)
 
     @Override
     public Map<String, InlongGroupStatusInfo> listGroupStatus(List<String> groupIds, String credentials) {
-        Map<String, InlongGroupStatusInfo> statusInfoMap = listGroupStatus(groupIds);
+        Map<String, InlongGroupStatusInfo> groupStatusMap = listGroupStatus(groupIds);
 
         // sort status info
-        ListSortStatusRequest sortStatusRequest = new ListSortStatusRequest();
-        sortStatusRequest.setInlongGroupIds(groupIds);
-        sortStatusRequest.setCredentials(credentials);
-        Map<String, SortStatus> sortStatusMap = groupClient.listSortStatus(sortStatusRequest).getStatusMap();
-
-        if (MapUtils.isNotEmpty(sortStatusMap)) {
-            statusInfoMap.forEach((groupId, statusInfo) -> {
-                statusInfo.setSortStatus(sortStatusMap.getOrDefault(groupId, SortStatus.UNKNOWN));
-            });
+        SortStatusRequest statusRequest = new SortStatusRequest();
+        statusRequest.setInlongGroupIds(groupIds);
+        statusRequest.setCredentials(credentials);
+        List<SortStatusInfo> sortStatusInfos = groupClient.listSortStatus(statusRequest);
+
+        if (CollectionUtils.isEmpty(sortStatusInfos)) {

Review Comment:
   I think you mean "isNotEmpty" here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] woofyzhao commented on a diff in pull request #6299: [INLONG-6297][Manager] Refactor the PollerPlugin and SortPoller interfaces

Posted by GitBox <gi...@apache.org>.
woofyzhao commented on code in PR #6299:
URL: https://github.com/apache/inlong/pull/6299#discussion_r1005606256


##########
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/FlinkSortPollerPlugin.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.plugin;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.plugin.poller.SortStatusPoller;
+import org.apache.inlong.manager.workflow.plugin.sort.PollerPlugin;
+import org.apache.inlong.manager.workflow.plugin.sort.SortPoller;
+
+/**
+ * Plugin of Flink Sort poller.
+ */
+@Slf4j
+public class FlinkSortPollerPlugin implements PollerPlugin {

Review Comment:
   The plugin loader must now support multiple plugin classes in a single jar.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on a diff in pull request #6299: [INLONG-6297][Manager] Refactor the PollerPlugin and SortPoller interfaces

Posted by GitBox <gi...@apache.org>.
gong commented on code in PR #6299:
URL: https://github.com/apache/inlong/pull/6299#discussion_r1005609993


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java:
##########
@@ -73,21 +74,23 @@ public SortSourceConfigResponse getSourceConfig(String clusterName, String sortT
     }
 
     @Override
-    public ListSortStatusResponse listSortStatus(ListSortStatusRequest request) {
-        Preconditions.checkNotNull(sortStatusPoller, "sort job status poller not initialized");
+    public List<SortStatusInfo> listSortStatus(SortStatusRequest request) {
+        Preconditions.checkNotNull(sortPoller, "sort status poller not initialized, please try later");
         List<InlongGroupInfo> groupInfoList = request.getInlongGroupIds().stream()
-                .map(groupId -> groupService.get(groupId)).collect(Collectors.toList());
-        Map<String, SortStatus> statusMap = sortStatusPoller.poll(groupInfoList, request.getCredentials());
-        log.debug("get sort status map = {}", statusMap);
-        return ListSortStatusResponse.builder().statusMap(statusMap).build();
+                .map(groupId -> groupService.get(groupId))
+                .collect(Collectors.toList());
+
+        List<SortStatusInfo> statusInfos = sortPoller.pollSortStatus(groupInfoList, request.getCredentials());
+        log.debug("success list sort status for request={}, result={}", request, statusInfos);
+
+        return statusInfos;
     }
 
     @Override
     public void acceptPlugin(Plugin plugin) {
-        if (!(plugin instanceof ProcessPlugin)) {
-            return;
+        if (plugin instanceof PollerPlugin) {
+            PollerPlugin pollerPlugin = (PollerPlugin) plugin;
+            sortPoller = pollerPlugin.getSortPoller();
         }
-        ProcessPlugin processPlugin = (ProcessPlugin) plugin;
-        sortStatusPoller = processPlugin.createSortStatusPoller();
     }

Review Comment:
   plugin only support a class from define of plugin.yaml. You can't add `FlinkSortPollerPlugin`. Because It can't be loaded to execute `acceptPlugin` method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow commented on a diff in pull request #6299: [INLONG-6297][Manager] Refactor the PollerPlugin and SortPoller interfaces

Posted by GitBox <gi...@apache.org>.
healchow commented on code in PR #6299:
URL: https://github.com/apache/inlong/pull/6299#discussion_r1005625204


##########
inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java:
##########
@@ -152,20 +153,22 @@ public Map<String, InlongGroupStatusInfo> listGroupStatus(List<String> groupIds)
 
     @Override
     public Map<String, InlongGroupStatusInfo> listGroupStatus(List<String> groupIds, String credentials) {
-        Map<String, InlongGroupStatusInfo> statusInfoMap = listGroupStatus(groupIds);
+        Map<String, InlongGroupStatusInfo> groupStatusMap = listGroupStatus(groupIds);
 
         // sort status info
-        ListSortStatusRequest sortStatusRequest = new ListSortStatusRequest();
-        sortStatusRequest.setInlongGroupIds(groupIds);
-        sortStatusRequest.setCredentials(credentials);
-        Map<String, SortStatus> sortStatusMap = groupClient.listSortStatus(sortStatusRequest).getStatusMap();
-
-        if (MapUtils.isNotEmpty(sortStatusMap)) {
-            statusInfoMap.forEach((groupId, statusInfo) -> {
-                statusInfo.setSortStatus(sortStatusMap.getOrDefault(groupId, SortStatus.UNKNOWN));
-            });
+        SortStatusRequest statusRequest = new SortStatusRequest();
+        statusRequest.setInlongGroupIds(groupIds);
+        statusRequest.setCredentials(credentials);
+        List<SortStatusInfo> sortStatusInfos = groupClient.listSortStatus(statusRequest);
+
+        if (CollectionUtils.isEmpty(sortStatusInfos)) {

Review Comment:
   yes, my bad...



##########
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/poller/SortStatusPoller.java:
##########
@@ -37,17 +41,38 @@
  * Flink sort task status poller for inlong groups
  */
 @Slf4j
-public class FlinkStatusPoller implements SortStatusPoller {
+public class SortStatusPoller implements SortPoller {
 
     /**
-     * Poll sort task status for groups
-     * @param groupInfos group ids to poll
-     * @param credentials not used for flink
-     * @return
+     * Flink job status to InLong sort status mapping.
      */
+    private static final Map<JobStatus, SortStatus> JOB_SORT_STATUS_MAP = new HashMap<>(8);

Review Comment:
   okay.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on a diff in pull request #6299: [INLONG-6297][Manager] Refactor the PollerPlugin and SortPoller interfaces

Posted by GitBox <gi...@apache.org>.
gong commented on code in PR #6299:
URL: https://github.com/apache/inlong/pull/6299#discussion_r1005570824


##########
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/poller/SortStatusPoller.java:
##########
@@ -37,17 +41,38 @@
  * Flink sort task status poller for inlong groups
  */
 @Slf4j
-public class FlinkStatusPoller implements SortStatusPoller {
+public class SortStatusPoller implements SortPoller {
 
     /**
-     * Poll sort task status for groups
-     * @param groupInfos group ids to poll
-     * @param credentials not used for flink
-     * @return
+     * Flink job status to InLong sort status mapping.
      */
+    private static final Map<JobStatus, SortStatus> JOB_SORT_STATUS_MAP = new HashMap<>(8);

Review Comment:
   8->16



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow commented on a diff in pull request #6299: [INLONG-6297][Manager] Refactor the PollerPlugin and SortPoller interfaces

Posted by GitBox <gi...@apache.org>.
healchow commented on code in PR #6299:
URL: https://github.com/apache/inlong/pull/6299#discussion_r1006394543


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java:
##########
@@ -73,21 +74,23 @@ public SortSourceConfigResponse getSourceConfig(String clusterName, String sortT
     }
 
     @Override
-    public ListSortStatusResponse listSortStatus(ListSortStatusRequest request) {
-        Preconditions.checkNotNull(sortStatusPoller, "sort job status poller not initialized");
+    public List<SortStatusInfo> listSortStatus(SortStatusRequest request) {
+        Preconditions.checkNotNull(sortPoller, "sort status poller not initialized, please try later");
         List<InlongGroupInfo> groupInfoList = request.getInlongGroupIds().stream()
-                .map(groupId -> groupService.get(groupId)).collect(Collectors.toList());
-        Map<String, SortStatus> statusMap = sortStatusPoller.poll(groupInfoList, request.getCredentials());
-        log.debug("get sort status map = {}", statusMap);
-        return ListSortStatusResponse.builder().statusMap(statusMap).build();
+                .map(groupId -> groupService.get(groupId))
+                .collect(Collectors.toList());
+
+        List<SortStatusInfo> statusInfos = sortPoller.pollSortStatus(groupInfoList, request.getCredentials());
+        log.debug("success list sort status for request={}, result={}", request, statusInfos);
+
+        return statusInfos;
     }
 
     @Override
     public void acceptPlugin(Plugin plugin) {
-        if (!(plugin instanceof ProcessPlugin)) {
-            return;
+        if (plugin instanceof PollerPlugin) {
+            PollerPlugin pollerPlugin = (PollerPlugin) plugin;
+            sortPoller = pollerPlugin.getSortPoller();
         }
-        ProcessPlugin processPlugin = (ProcessPlugin) plugin;
-        sortStatusPoller = processPlugin.createSortStatusPoller();
     }

Review Comment:
   Change the `plugin.yaml` to support loading more than one class in the manager plugin.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow commented on a diff in pull request #6299: [INLONG-6297][Manager] Refactor the PollerPlugin and SortPoller interfaces

Posted by GitBox <gi...@apache.org>.
healchow commented on code in PR #6299:
URL: https://github.com/apache/inlong/pull/6299#discussion_r1006394351


##########
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/FlinkSortPollerPlugin.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.plugin;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.plugin.poller.SortStatusPoller;
+import org.apache.inlong.manager.workflow.plugin.sort.PollerPlugin;
+import org.apache.inlong.manager.workflow.plugin.sort.SortPoller;
+
+/**
+ * Plugin of Flink Sort poller.
+ */
+@Slf4j
+public class FlinkSortPollerPlugin implements PollerPlugin {

Review Comment:
   Keep one jar, and support loading more than one class in the manager plugin.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org