You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/10/27 08:10:34 UTC

[inlong] branch master updated: [INLONG-6297][Manager] Refactor the PollerPlugin and SortPoller interfaces (#6299)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b96a5b41 [INLONG-6297][Manager] Refactor the PollerPlugin and SortPoller interfaces (#6299)
6b96a5b41 is described below

commit 6b96a5b4179815e295501738ba66757885dcf9a6
Author: healchow <he...@gmail.com>
AuthorDate: Thu Oct 27 16:10:28 2022 +0800

    [INLONG-6297][Manager] Refactor the PollerPlugin and SortPoller interfaces (#6299)
---
 .../manager/client/api/impl/InlongClientImpl.java  |  27 ++++---
 .../manager/client/api/impl/InlongGroupImpl.java   |  31 +++++---
 .../client/api/inner/client/InlongGroupClient.java |  13 ++--
 .../manager/client/api/service/InlongSortApi.java  |   8 +-
 .../inlong/manager/common/enums/SortStatus.java    |  13 ++--
 .../inlong/manager/common}/plugin/Plugin.java      |   2 +-
 .../manager/common}/plugin/PluginBinder.java       |   2 +-
 .../manager/common}/plugin/PluginDefinition.java   |  30 +++++---
 .../manager/plugin/FlinkSortPollerPlugin.java}     |  23 +++---
 .../manager/plugin/FlinkSortProcessPlugin.java     |   8 +-
 ...linkStatusPoller.java => SortStatusPoller.java} |  83 +++++++++++----------
 .../src/main/resources/META-INF/plugin.yaml        |   7 +-
 .../manager/pojo/group/InlongGroupStatusInfo.java  |   2 +-
 ...SortStatusResponse.java => SortStatusInfo.java} |  15 ++--
 ...rtStatusRequest.java => SortStatusRequest.java} |  12 ++-
 inlong-manager/manager-service/pom.xml             |  10 ++-
 .../inlong/manager/service/core/SortService.java   |  18 +++--
 .../manager/service/core/impl/SortServiceImpl.java |  57 ++++++++------
 .../service/listener/GroupTaskListenerFactory.java |   4 +-
 .../listener/StreamTaskListenerFactory.java        |   4 +-
 .../inlong/manager/service/plugin/JarHell.java     |  10 +--
 .../manager/service/plugin/PluginClassLoader.java  |  67 +++++++++--------
 .../manager/service/plugin/PluginService.java      |  49 ++++++------
 .../service/plugin/PluginClassLoaderTest.java      |  36 +++++----
 .../manager/service/plugin/PluginServiceTest.java  |  11 +--
 .../manager/service/sort/DisableZkForSortTest.java |   7 +-
 .../src/test/resources/application.properties      |   1 -
 .../resources/plugins/manager-plugin-example.jar   | Bin 93525 -> 96655 bytes
 .../web/controller/InlongSortController.java       |  14 ++--
 .../manager/workflow/plugin/ProcessPlugin.java     |   5 +-
 .../PollerPlugin.java}                             |  23 +++---
 .../SortPoller.java}                               |  21 ++++--
 32 files changed, 347 insertions(+), 266 deletions(-)

diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
index 1a6c04c20..2852e0a16 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
@@ -51,7 +51,8 @@ import org.apache.inlong.manager.pojo.group.InlongGroupBriefInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupPageRequest;
 import org.apache.inlong.manager.pojo.group.InlongGroupStatusInfo;
-import org.apache.inlong.manager.pojo.sort.ListSortStatusRequest;
+import org.apache.inlong.manager.pojo.sort.SortStatusInfo;
+import org.apache.inlong.manager.pojo.sort.SortStatusRequest;
 import org.apache.inlong.manager.pojo.source.StreamSource;
 
 import java.util.List;
@@ -152,20 +153,22 @@ public class InlongClientImpl implements InlongClient {
 
     @Override
     public Map<String, InlongGroupStatusInfo> listGroupStatus(List<String> groupIds, String credentials) {
-        Map<String, InlongGroupStatusInfo> statusInfoMap = listGroupStatus(groupIds);
+        Map<String, InlongGroupStatusInfo> groupStatusMap = listGroupStatus(groupIds);
 
         // sort status info
-        ListSortStatusRequest sortStatusRequest = new ListSortStatusRequest();
-        sortStatusRequest.setInlongGroupIds(groupIds);
-        sortStatusRequest.setCredentials(credentials);
-        Map<String, SortStatus> sortStatusMap = groupClient.listSortStatus(sortStatusRequest).getStatusMap();
-
-        if (MapUtils.isNotEmpty(sortStatusMap)) {
-            statusInfoMap.forEach((groupId, statusInfo) -> {
-                statusInfo.setSortStatus(sortStatusMap.getOrDefault(groupId, SortStatus.UNKNOWN));
-            });
+        SortStatusRequest statusRequest = new SortStatusRequest();
+        statusRequest.setInlongGroupIds(groupIds);
+        statusRequest.setCredentials(credentials);
+        List<SortStatusInfo> sortStatusInfos = groupClient.listSortStatus(statusRequest);
+
+        if (CollectionUtils.isNotEmpty(sortStatusInfos)) {
+            Map<String, SortStatus> sortStatusMap = sortStatusInfos.stream()
+                    .collect(Collectors.toMap(SortStatusInfo::getInlongGroupId, SortStatusInfo::getSortStatus));
+            groupStatusMap.forEach((groupId, groupStatusInfo) ->
+                    groupStatusInfo.setSortStatus(sortStatusMap.getOrDefault(groupId, SortStatus.NOT_EXISTS)));
         }
-        return statusInfoMap;
+
+        return groupStatusMap;
     }
 
     @Override
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
index f7c4fb656..2f6538e22 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
@@ -19,7 +19,6 @@ package org.apache.inlong.manager.client.api.impl;
 
 import com.google.common.base.Objects;
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.inlong.manager.client.api.ClientConfiguration;
@@ -46,8 +45,8 @@ import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupResetRequest;
 import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
 import org.apache.inlong.manager.pojo.sort.BaseSortConf;
-import org.apache.inlong.manager.pojo.sort.ListSortStatusRequest;
-import org.apache.inlong.manager.pojo.sort.ListSortStatusResponse;
+import org.apache.inlong.manager.pojo.sort.SortStatusInfo;
+import org.apache.inlong.manager.pojo.sort.SortStatusRequest;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.pojo.workflow.ProcessResponse;
 import org.apache.inlong.manager.pojo.workflow.TaskResponse;
@@ -57,6 +56,7 @@ import org.springframework.boot.configurationprocessor.json.JSONObject;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
 /**
@@ -294,17 +294,26 @@ public class InlongGroupImpl implements InlongGroup {
     private InlongGroupContext generateSnapshot(String credentials) {
         InlongGroupContext groupContext = generateSnapshot();
         InlongGroupInfo groupInfo = groupContext.getGroupInfo();
-        if (groupInfo.getExtList().stream().anyMatch(info -> InlongConstants.SORT_JOB_ID.equals(info.getKeyName())
-                && StringUtils.isNotEmpty(info.getKeyValue()))) {
-            ListSortStatusRequest request = new ListSortStatusRequest();
-            request.setInlongGroupIds(Collections.singletonList(groupInfo.getInlongGroupId()));
+
+        SortStatus sortStatus = SortStatus.NOT_EXISTS;
+        if (groupInfo.getExtList().stream().anyMatch(ext -> InlongConstants.SORT_JOB_ID.equals(ext.getKeyName())
+                && StringUtils.isNotEmpty(ext.getKeyValue()))) {
+            SortStatusRequest request = new SortStatusRequest();
+            final String groupId = groupInfo.getInlongGroupId();
+            request.setInlongGroupIds(Collections.singletonList(groupId));
             request.setCredentials(credentials);
-            ListSortStatusResponse sortStatusInfo = groupClient.listSortStatus(request);
-            if (MapUtils.isNotEmpty(sortStatusInfo.getStatusMap())) {
-                groupContext.updateSortStatus(sortStatusInfo.getStatusMap().getOrDefault(
-                        groupInfo.getInlongGroupId(), SortStatus.UNKNOWN));
+            List<SortStatusInfo> statusInfos = groupClient.listSortStatus(request);
+            if (CollectionUtils.isNotEmpty(statusInfos)) {
+                Optional<SortStatusInfo> optional = statusInfos.stream()
+                        .filter(statusInfo -> groupId.equals(statusInfo.getInlongGroupId()))
+                        .findFirst();
+                if (optional.isPresent()) {
+                    sortStatus = optional.get().getSortStatus();
+                }
             }
         }
+        groupContext.updateSortStatus(sortStatus);
+
         return groupContext;
     }
 
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java
index 6aba39848..e1f58763d 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java
@@ -35,12 +35,14 @@ import org.apache.inlong.manager.pojo.group.InlongGroupPageRequest;
 import org.apache.inlong.manager.pojo.group.InlongGroupRequest;
 import org.apache.inlong.manager.pojo.group.InlongGroupResetRequest;
 import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
-import org.apache.inlong.manager.pojo.sort.ListSortStatusRequest;
-import org.apache.inlong.manager.pojo.sort.ListSortStatusResponse;
+import org.apache.inlong.manager.pojo.sort.SortStatusInfo;
+import org.apache.inlong.manager.pojo.sort.SortStatusRequest;
 import org.apache.inlong.manager.pojo.workflow.WorkflowResult;
 import org.springframework.boot.configurationprocessor.json.JSONObject;
 import retrofit2.Call;
 
+import java.util.List;
+
 import static org.apache.inlong.manager.client.api.impl.InlongGroupImpl.MQ_FIELD;
 import static org.apache.inlong.manager.client.api.impl.InlongGroupImpl.MQ_FIELD_OLD;
 
@@ -147,11 +149,10 @@ public class InlongGroupClient {
      * List sort task status for inlong groups
      *
      * @param request sort status request
-     * @return Response encapsulate of group id to sort status map
+     * @return list of sort status infos
      */
-    public ListSortStatusResponse listSortStatus(ListSortStatusRequest request) {
-        Response<ListSortStatusResponse> response = ClientUtils.executeHttpCall(
-                inlongSortApi.listSortStatus(request));
+    public List<SortStatusInfo> listSortStatus(SortStatusRequest request) {
+        Response<List<SortStatusInfo>> response = ClientUtils.executeHttpCall(inlongSortApi.listStatus(request));
         ClientUtils.assertRespSuccess(response);
         return response.getData();
     }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongSortApi.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongSortApi.java
index fc0ce068f..69881ed70 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongSortApi.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongSortApi.java
@@ -18,15 +18,17 @@
 package org.apache.inlong.manager.client.api.service;
 
 import org.apache.inlong.manager.pojo.common.Response;
-import org.apache.inlong.manager.pojo.sort.ListSortStatusRequest;
-import org.apache.inlong.manager.pojo.sort.ListSortStatusResponse;
+import org.apache.inlong.manager.pojo.sort.SortStatusInfo;
+import org.apache.inlong.manager.pojo.sort.SortStatusRequest;
 import retrofit2.Call;
 import retrofit2.http.Body;
 import retrofit2.http.POST;
 
+import java.util.List;
+
 public interface InlongSortApi {
 
     @POST("sort/listStatus")
-    Call<Response<ListSortStatusResponse>> listSortStatus(@Body ListSortStatusRequest request);
+    Call<Response<List<SortStatusInfo>>> listStatus(@Body SortStatusRequest request);
 
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SortStatus.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SortStatus.java
index c33f2e8e7..183b2d4a9 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SortStatus.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SortStatus.java
@@ -25,14 +25,15 @@ import com.fasterxml.jackson.annotation.JsonValue;
 public enum SortStatus {
 
     NOT_EXISTS(40, "job not exists"),
-    NEW(100, "job not started: draft, pending to run, etc"),
+    NEW(100, "job not started: draft, pending, etc."),
     RUNNING(110, "job is running"),
-    PAUSED(120, "job is paused"),
-    STOPPED(130, "job has stopped without error, e.g canceled"),
-    FAILED(140, "job failed with error"),
+    PAUSED(120, "job was paused"),
+    STOPPED(130, "job stopped without error, e.g canceled"),
+    FAILED(140, "job failed with an error"),
     FINISHED(200, "job finished successfully"),
-    OPERATING(300, "job in intermediate state such as restarting, canceling, etc"),
-    UNKNOWN(400, "job status unknown")
+    OPERATING(300, "job in an intermediate status such as restarting, canceling, etc."),
+    UNKNOWN(400, "job status unknown"),
+
     ;
 
     @JsonValue
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/Plugin.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/plugin/Plugin.java
similarity index 94%
rename from inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/Plugin.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/plugin/Plugin.java
index 27f200673..a8f0a4b75 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/Plugin.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/plugin/Plugin.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.workflow.plugin;
+package org.apache.inlong.manager.common.plugin;
 
 /**
  * Interface of plugin.
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/PluginBinder.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/plugin/PluginBinder.java
similarity index 94%
rename from inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/PluginBinder.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/plugin/PluginBinder.java
index d90327e13..3086534dc 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/PluginBinder.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/plugin/PluginBinder.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.workflow.plugin;
+package org.apache.inlong.manager.common.plugin;
 
 /**
  * Interface of plugin binder.
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/PluginDefinition.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/plugin/PluginDefinition.java
similarity index 63%
rename from inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/PluginDefinition.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/plugin/PluginDefinition.java
index c0d5f1856..f91080579 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/PluginDefinition.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/plugin/PluginDefinition.java
@@ -15,38 +15,46 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.workflow.plugin;
+package org.apache.inlong.manager.common.plugin;
 
 import lombok.Data;
 
+import java.util.List;
+
 /**
- * pluginDefinition should be defined in *.jar/META-INF/plugin.yaml
- * for example:
- *    name: test
- *    description: this plugin is use for test
- *    pluginClass: org.apache.inlong.plugin.testPlugin
+ * PluginDefinition should be defined in *.jar/META-INF/plugin.yaml.
+ * <p/> For example:
+ *
+ * <pre>
+ *    name: test-plugin
+ *    description: this plugin is used for test
  *    javaVersion: 1.8 or 8
+ *    pluginClasses:
+ *      - org.apache.inlong.plugin.TestPlugin1
+ *      - org.apache.inlong.plugin.TestPlugin2
+ * </pre>
  */
 @Data
 public class PluginDefinition {
 
     /**
-     * name of plugin
+     * Name of plugin
      */
     private String name;
 
     /**
-     * description of plugin to be used for user help
+     * Description of plugin to be used for user help
      */
     private String description;
 
     /**
-     * java_version of plugin to be used for check validate
+     * Java version of plugin to be used for check validate
      */
     private String javaVersion;
 
     /**
-     * the full class name of plugin
+     * List of the full class name of plugins
      */
-    private String pluginClass;
+    private List<String> pluginClasses;
+
 }
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/SortStatusPoller.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/FlinkSortPollerPlugin.java
similarity index 63%
copy from inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/SortStatusPoller.java
copy to inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/FlinkSortPollerPlugin.java
index 295fbcff6..6ab32b2fb 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/SortStatusPoller.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/FlinkSortPollerPlugin.java
@@ -15,17 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.workflow.plugin;
+package org.apache.inlong.manager.plugin;
 
-import org.apache.inlong.manager.common.enums.SortStatus;
-import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
-
-import java.util.List;
-import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.plugin.poller.SortStatusPoller;
+import org.apache.inlong.manager.workflow.plugin.sort.PollerPlugin;
+import org.apache.inlong.manager.workflow.plugin.sort.SortPoller;
 
 /**
- * Sort task status poller interface for inlong groups
+ * Plugin of Flink Sort poller.
  */
-public interface SortStatusPoller {
-    Map<String, SortStatus> poll(List<InlongGroupInfo> groupInfo, String credentials);
+@Slf4j
+public class FlinkSortPollerPlugin implements PollerPlugin {
+
+    @Override
+    public SortPoller getSortPoller() {
+        return new SortStatusPoller();
+    }
+
 }
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/FlinkSortProcessPlugin.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/FlinkSortProcessPlugin.java
index be269c1ec..e475414bf 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/FlinkSortProcessPlugin.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/FlinkSortProcessPlugin.java
@@ -26,8 +26,6 @@ import org.apache.inlong.manager.plugin.listener.StartupSortListener;
 import org.apache.inlong.manager.plugin.listener.StartupStreamListener;
 import org.apache.inlong.manager.plugin.listener.SuspendSortListener;
 import org.apache.inlong.manager.plugin.listener.SuspendStreamListener;
-import org.apache.inlong.manager.plugin.poller.FlinkStatusPoller;
-import org.apache.inlong.manager.workflow.plugin.SortStatusPoller;
 import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
 import org.apache.inlong.manager.workflow.event.task.SourceOperateListener;
 import org.apache.inlong.manager.workflow.plugin.ProcessPlugin;
@@ -36,7 +34,7 @@ import java.util.LinkedList;
 import java.util.List;
 
 /**
- * Plugin of flink sort process.
+ * Plugin of Flink Sort process.
  */
 @Slf4j
 public class FlinkSortProcessPlugin implements ProcessPlugin {
@@ -60,8 +58,4 @@ public class FlinkSortProcessPlugin implements ProcessPlugin {
         return listeners;
     }
 
-    @Override
-    public SortStatusPoller createSortStatusPoller() {
-        return new FlinkStatusPoller();
-    }
 }
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/poller/FlinkStatusPoller.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/poller/SortStatusPoller.java
similarity index 55%
rename from inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/poller/FlinkStatusPoller.java
rename to inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/poller/SortStatusPoller.java
index 90efcde21..386979ecc 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/poller/FlinkStatusPoller.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/poller/SortStatusPoller.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.plugin.poller;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.inlong.manager.common.consts.InlongConstants;
@@ -26,9 +27,12 @@ import org.apache.inlong.manager.common.enums.SortStatus;
 import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.plugin.flink.FlinkService;
 import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
-import org.apache.inlong.manager.workflow.plugin.SortStatusPoller;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.pojo.sort.SortStatusInfo;
+import org.apache.inlong.manager.workflow.plugin.sort.SortPoller;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -37,17 +41,38 @@ import java.util.Map;
  * Flink sort task status poller for inlong groups
  */
 @Slf4j
-public class FlinkStatusPoller implements SortStatusPoller {
+public class SortStatusPoller implements SortPoller {
 
     /**
-     * Poll sort task status for groups
-     * @param groupInfos group ids to poll
-     * @param credentials not used for flink
-     * @return
+     * Flink job status to InLong sort status mapping.
      */
+    private static final Map<JobStatus, SortStatus> JOB_SORT_STATUS_MAP = new HashMap<>(16);
+
+    static {
+        JOB_SORT_STATUS_MAP.put(JobStatus.CREATED, SortStatus.NEW);
+        JOB_SORT_STATUS_MAP.put(JobStatus.INITIALIZING, SortStatus.NEW);
+
+        JOB_SORT_STATUS_MAP.put(JobStatus.RUNNING, SortStatus.RUNNING);
+        JOB_SORT_STATUS_MAP.put(JobStatus.FAILED, SortStatus.FAILED);
+        JOB_SORT_STATUS_MAP.put(JobStatus.CANCELED, SortStatus.STOPPED);
+        JOB_SORT_STATUS_MAP.put(JobStatus.SUSPENDED, SortStatus.PAUSED);
+        JOB_SORT_STATUS_MAP.put(JobStatus.FINISHED, SortStatus.FINISHED);
+
+        JOB_SORT_STATUS_MAP.put(JobStatus.FAILING, SortStatus.OPERATING);
+        JOB_SORT_STATUS_MAP.put(JobStatus.CANCELLING, SortStatus.OPERATING);
+        JOB_SORT_STATUS_MAP.put(JobStatus.RESTARTING, SortStatus.OPERATING);
+        JOB_SORT_STATUS_MAP.put(JobStatus.RECONCILING, SortStatus.OPERATING);
+    }
+
     @Override
-    public Map<String, SortStatus> poll(List<InlongGroupInfo> groupInfos, String credentials) {
-        Map<String, SortStatus> statusMap = new HashMap<>();
+    public List<SortStatusInfo> pollSortStatus(List<InlongGroupInfo> groupInfos, String credentials) {
+        log.debug("begin to poll sort status for inlong groups");
+        if (CollectionUtils.isEmpty(groupInfos)) {
+            log.debug("end to poll sort status, as the inlong groups is empty");
+            return Collections.emptyList();
+        }
+
+        List<SortStatusInfo> statusInfos = new ArrayList<>(groupInfos.size());
         for (InlongGroupInfo groupInfo : groupInfos) {
             String groupId = groupInfo.getInlongGroupId();
             try {
@@ -65,46 +90,26 @@ public class FlinkStatusPoller implements SortStatusPoller {
                 }
 
                 String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
+                SortStatusInfo statusInfo = SortStatusInfo.builder().inlongGroupId(groupId).build();
                 if (StringUtils.isBlank(jobId)) {
-                    statusMap.put(groupId, SortStatus.NOT_EXISTS);
+                    statusInfo.setSortStatus(SortStatus.NOT_EXISTS);
+                    statusInfos.add(statusInfo);
                     continue;
                 }
 
                 String sortUrl = kvConf.get(InlongConstants.SORT_URL);
                 FlinkService flinkService = new FlinkService(sortUrl);
-                SortStatus status = convertToSortStatus(flinkService.getJobStatus(jobId));
-                statusMap.put(groupId, status);
-                log.info("get sort status = {} for flink jodId = {}, sortUrl = {}", status, jobId, sortUrl);
+                statusInfo.setSortStatus(
+                        JOB_SORT_STATUS_MAP.getOrDefault(flinkService.getJobStatus(jobId), SortStatus.UNKNOWN)
+                );
+                statusInfos.add(statusInfo);
             } catch (Exception e) {
-                log.error("polling sort status failed for group " + groupId, e);
-                statusMap.put(groupId, SortStatus.UNKNOWN);
+                log.error("polling sort status failed for groupId=" + groupId, e);
             }
         }
-        return statusMap;
-    }
 
-    private SortStatus convertToSortStatus(JobStatus jobStatus) {
-        log.debug("flink jobStatus = {}", jobStatus);
-        switch (jobStatus) {
-            case CREATED:
-            case INITIALIZING:
-                return SortStatus.NEW;
-            case RUNNING:
-                return SortStatus.RUNNING;
-            case FAILED:
-                return SortStatus.FAILED;
-            case CANCELED:
-                return SortStatus.STOPPED;
-            case SUSPENDED:
-                return SortStatus.PAUSED;
-            case FINISHED:
-                return SortStatus.FINISHED;
-            case FAILING:
-            case CANCELLING:
-            case RESTARTING:
-            case RECONCILING:
-                return SortStatus.OPERATING;
-        }
-        return SortStatus.UNKNOWN;
+        log.debug("success to get sort status: {}", statusInfos);
+        return statusInfos;
     }
+
 }
diff --git a/inlong-manager/manager-plugins/src/main/resources/META-INF/plugin.yaml b/inlong-manager/manager-plugins/src/main/resources/META-INF/plugin.yaml
index 31a71af27..28a92c58f 100644
--- a/inlong-manager/manager-plugins/src/main/resources/META-INF/plugin.yaml
+++ b/inlong-manager/manager-plugins/src/main/resources/META-INF/plugin.yaml
@@ -15,9 +15,10 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-#
 
-name: flink plugin
+name: flink-sort-plugin
 description: flink for manager plugin
 javaVersion: 1.8
-pluginClass: org.apache.inlong.manager.plugin.FlinkSortProcessPlugin
+pluginClasses:
+  - org.apache.inlong.manager.plugin.FlinkSortPollerPlugin
+  - org.apache.inlong.manager.plugin.FlinkSortProcessPlugin
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupStatusInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupStatusInfo.java
index 7f798d478..9d62f0618 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupStatusInfo.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupStatusInfo.java
@@ -51,7 +51,7 @@ public class InlongGroupStatusInfo {
     @ApiModelProperty(value = "Stream sources in the inlong group")
     private List<StreamSource> streamSources;
 
-    @ApiModelProperty(value = "sort job status of the group")
+    @ApiModelProperty(value = "Sort job status of the group")
     private SortStatus sortStatus = SortStatus.UNKNOWN;
 
 }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/ListSortStatusResponse.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/SortStatusInfo.java
similarity index 82%
rename from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/ListSortStatusResponse.java
rename to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/SortStatusInfo.java
index 6b32b314a..bb4011899 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/ListSortStatusResponse.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/SortStatusInfo.java
@@ -25,19 +25,20 @@ import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.enums.SortStatus;
 
-import java.util.Map;
-
 /**
- * Sort status response
+ * Sort status info
  */
 @Data
 @Builder
 @NoArgsConstructor
 @AllArgsConstructor
-@ApiModel("list sort status response")
-public class ListSortStatusResponse {
+@ApiModel("Sort status info")
+public class SortStatusInfo {
+
+    @ApiModelProperty(value = "Inlong group id")
+    private String inlongGroupId;
 
-    @ApiModelProperty(value = "group id to sort status mapping")
-    private Map<String, SortStatus> statusMap;
+    @ApiModelProperty(value = "Sort status info")
+    private SortStatus sortStatus;
 
 }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/ListSortStatusRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/SortStatusRequest.java
similarity index 79%
rename from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/ListSortStatusRequest.java
rename to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/SortStatusRequest.java
index 98757e224..d29368fe0 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/ListSortStatusRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/SortStatusRequest.java
@@ -19,7 +19,10 @@ package org.apache.inlong.manager.pojo.sort;
 
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
 import lombok.Data;
+import lombok.NoArgsConstructor;
 
 import java.util.List;
 
@@ -27,13 +30,16 @@ import java.util.List;
  * Sort status request
  */
 @Data
-@ApiModel("list sort status request")
-public class ListSortStatusRequest {
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@ApiModel("Sort status request")
+public class SortStatusRequest {
 
     @ApiModelProperty(value = "Inlong group ids")
     private List<String> inlongGroupIds;
 
-    @ApiModelProperty(value = "Optional credential info needed for backend query, such as sort cluster token")
+    @ApiModelProperty(value = "Optional credential info, such as the token of Sort cluster")
     private String credentials;
 
 }
diff --git a/inlong-manager/manager-service/pom.xml b/inlong-manager/manager-service/pom.xml
index 0bb9ccaf0..68c5818e9 100644
--- a/inlong-manager/manager-service/pom.xml
+++ b/inlong-manager/manager-service/pom.xml
@@ -104,6 +104,11 @@
             <artifactId>slf4j-api</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>com.github.ben-manes.caffeine</groupId>
+            <artifactId>caffeine</artifactId>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.shiro</groupId>
             <artifactId>shiro-core</artifactId>
@@ -550,8 +555,9 @@
             <scope>test</scope>
         </dependency>
         <dependency>
-            <groupId>com.github.ben-manes.caffeine</groupId>
-            <artifactId>caffeine</artifactId>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-core</artifactId>
+            <scope>test</scope>
         </dependency>
     </dependencies>
 </project>
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortService.java
index 7f24cc575..610bf1918 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortService.java
@@ -17,15 +17,18 @@
 
 package org.apache.inlong.manager.service.core;
 
-import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
 import org.apache.inlong.common.pojo.sdk.SortSourceConfigResponse;
-import org.apache.inlong.manager.pojo.sort.ListSortStatusRequest;
-import org.apache.inlong.manager.pojo.sort.ListSortStatusResponse;
+import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
+import org.apache.inlong.manager.pojo.sort.SortStatusInfo;
+import org.apache.inlong.manager.pojo.sort.SortStatusRequest;
+
+import java.util.List;
 
 /**
  * Sort Service
  */
 public interface SortService {
+
     /**
      * Get sort cluster config.
      *
@@ -59,10 +62,11 @@ public interface SortService {
     SortSourceConfigResponse getSourceConfig(String clusterName, String sortTaskId, String md5);
 
     /**
-     * Get sort job status.
+     * List Sort job status.
      *
-     * @param request Request params like group ids and sort cluster token
-     * @return Response of corresponding sort jobs' status
+     * @param request Sort status request, including inlong group ids,sort cluster token, etc.
+     * @return list of Sort job status
      */
-    ListSortStatusResponse listSortStatus(ListSortStatusRequest request);
+    List<SortStatusInfo> listSortStatus(SortStatusRequest request);
+
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
index b5aa0cb27..a8fd0677c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
@@ -18,27 +18,26 @@
 package org.apache.inlong.manager.service.core.impl;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
 import org.apache.inlong.common.pojo.sdk.SortSourceConfigResponse;
-import org.apache.inlong.manager.common.enums.SortStatus;
+import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.plugin.Plugin;
+import org.apache.inlong.manager.common.plugin.PluginBinder;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.pojo.sort.ListSortStatusRequest;
-import org.apache.inlong.manager.pojo.sort.ListSortStatusResponse;
+import org.apache.inlong.manager.pojo.sort.SortStatusInfo;
+import org.apache.inlong.manager.pojo.sort.SortStatusRequest;
 import org.apache.inlong.manager.service.core.SortClusterService;
-import org.apache.inlong.manager.service.core.SortSourceService;
 import org.apache.inlong.manager.service.core.SortService;
+import org.apache.inlong.manager.service.core.SortSourceService;
 import org.apache.inlong.manager.service.group.InlongGroupService;
-import org.apache.inlong.manager.workflow.plugin.Plugin;
-import org.apache.inlong.manager.workflow.plugin.PluginBinder;
-import org.apache.inlong.manager.workflow.plugin.ProcessPlugin;
-import org.apache.inlong.manager.workflow.plugin.SortStatusPoller;
+import org.apache.inlong.manager.workflow.plugin.sort.PollerPlugin;
+import org.apache.inlong.manager.workflow.plugin.sort.SortPoller;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.annotation.Lazy;
 import org.springframework.stereotype.Service;
 
 import java.util.List;
-import java.util.Map;
 import java.util.stream.Collectors;
 
 /**
@@ -52,15 +51,18 @@ public class SortServiceImpl implements SortService, PluginBinder {
     @Lazy
     @Autowired
     private SortSourceService sortSourceService;
-
     @Lazy
     @Autowired
     private SortClusterService sortClusterService;
-
     @Autowired
     private InlongGroupService groupService;
 
-    private SortStatusPoller sortStatusPoller;
+    /**
+     * The plugin poller will be initialed after the application starts.
+     *
+     * @see org.apache.inlong.manager.service.plugin.PluginService#afterPropertiesSet
+     */
+    private SortPoller sortPoller;
 
     @Override
     public SortClusterResponse getClusterConfig(String clusterName, String md5) {
@@ -73,21 +75,28 @@ public class SortServiceImpl implements SortService, PluginBinder {
     }
 
     @Override
-    public ListSortStatusResponse listSortStatus(ListSortStatusRequest request) {
-        Preconditions.checkNotNull(sortStatusPoller, "sort job status poller not initialized");
-        List<InlongGroupInfo> groupInfoList = request.getInlongGroupIds().stream()
-                .map(groupId -> groupService.get(groupId)).collect(Collectors.toList());
-        Map<String, SortStatus> statusMap = sortStatusPoller.poll(groupInfoList, request.getCredentials());
-        log.debug("get sort status map = {}", statusMap);
-        return ListSortStatusResponse.builder().statusMap(statusMap).build();
+    public List<SortStatusInfo> listSortStatus(SortStatusRequest request) {
+        Preconditions.checkNotNull(sortPoller, "sort status poller not initialized, please try later");
+
+        try {
+            List<InlongGroupInfo> groupInfoList = request.getInlongGroupIds().stream()
+                    .map(groupId -> groupService.get(groupId))
+                    .collect(Collectors.toList());
+
+            List<SortStatusInfo> statusInfos = sortPoller.pollSortStatus(groupInfoList, request.getCredentials());
+            log.debug("success to list sort status for request={}, result={}", request, statusInfos);
+            return statusInfos;
+        } catch (Exception e) {
+            log.error("poll sort status error: ", e);
+            throw new BusinessException("poll sort status error: " + e.getMessage());
+        }
     }
 
     @Override
     public void acceptPlugin(Plugin plugin) {
-        if (!(plugin instanceof ProcessPlugin)) {
-            return;
+        if (plugin instanceof PollerPlugin) {
+            PollerPlugin pollerPlugin = (PollerPlugin) plugin;
+            sortPoller = pollerPlugin.getSortPoller();
         }
-        ProcessPlugin processPlugin = (ProcessPlugin) plugin;
-        sortStatusPoller = processPlugin.createSortStatusPoller();
     }
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/GroupTaskListenerFactory.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/GroupTaskListenerFactory.java
index 9e0328d94..aa5e57889 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/GroupTaskListenerFactory.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/GroupTaskListenerFactory.java
@@ -33,8 +33,8 @@ import org.apache.inlong.manager.workflow.event.task.QueueOperateListener;
 import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
 import org.apache.inlong.manager.workflow.event.task.SourceOperateListener;
 import org.apache.inlong.manager.workflow.event.task.TaskEventListener;
-import org.apache.inlong.manager.workflow.plugin.Plugin;
-import org.apache.inlong.manager.workflow.plugin.PluginBinder;
+import org.apache.inlong.manager.common.plugin.Plugin;
+import org.apache.inlong.manager.common.plugin.PluginBinder;
 import org.apache.inlong.manager.workflow.plugin.ProcessPlugin;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/StreamTaskListenerFactory.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/StreamTaskListenerFactory.java
index d14cf9052..110fe109f 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/StreamTaskListenerFactory.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/StreamTaskListenerFactory.java
@@ -30,8 +30,8 @@ import org.apache.inlong.manager.workflow.event.task.SinkOperateListener;
 import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
 import org.apache.inlong.manager.workflow.event.task.SourceOperateListener;
 import org.apache.inlong.manager.workflow.event.task.TaskEventListener;
-import org.apache.inlong.manager.workflow.plugin.Plugin;
-import org.apache.inlong.manager.workflow.plugin.PluginBinder;
+import org.apache.inlong.manager.common.plugin.Plugin;
+import org.apache.inlong.manager.common.plugin.PluginBinder;
 import org.apache.inlong.manager.workflow.plugin.ProcessPlugin;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/plugin/JarHell.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/plugin/JarHell.java
index 62c1b0bb6..62dad0869 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/plugin/JarHell.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/plugin/JarHell.java
@@ -43,8 +43,7 @@ public class JarHell {
         JavaVersion version = parse(javaVersion);
         if (CURRENT_VERSION.compareTo(version) < 0) {
             throw new IllegalArgumentException(
-                    String.format("%s requires Java %s:, your system: %s", resource, CURRENT_VERSION,
-                            javaVersion));
+                    String.format("%s requires Java %s, your system: %s", resource, CURRENT_VERSION, javaVersion));
         }
     }
 
@@ -76,7 +75,7 @@ public class JarHell {
     }
 
     /**
-     * Check the string if is vaild.
+     * Check the string if is valid.
      */
     public static boolean isValid(String value) {
         return value.matches("^0*[0-9]+(\\.[0-9]+)*(-[a-zA-Z0-9]+)?$");
@@ -127,7 +126,7 @@ public class JarHell {
                 return -1;
             } else if (prePart == null && o.prePart != null) {
                 return 1;
-            } else if (prePart != null && o.prePart != null) {
+            } else if (prePart != null) {
                 return comparePrePart(prePart, o.prePart);
             }
             return 0;
@@ -138,8 +137,7 @@ public class JarHell {
                 return otherPrePart.matches("\\d+")
                         ? (new BigInteger(prePart)).compareTo(new BigInteger(otherPrePart)) : -1;
             } else {
-                return otherPrePart.matches("\\d+")
-                        ? 1 : prePart.compareTo(otherPrePart);
+                return otherPrePart.matches("\\d+") ? 1 : prePart.compareTo(otherPrePart);
             }
         }
     }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/plugin/PluginClassLoader.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/plugin/PluginClassLoader.java
index 5eb5a28ec..e8088f18e 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/plugin/PluginClassLoader.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/plugin/PluginClassLoader.java
@@ -24,7 +24,8 @@ import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.workflow.plugin.PluginDefinition;
+import org.apache.inlong.manager.common.plugin.PluginDefinition;
+import org.apache.inlong.manager.common.util.Preconditions;
 
 import java.io.File;
 import java.io.IOException;
@@ -57,19 +58,27 @@ public class PluginClassLoader extends URLClassLoader {
      * plugin.yaml should less than 1k
      */
     public static final int PLUGIN_DEF_CAPACITY = 1024;
+    private static final ObjectMapper YAML_MAPPER;
+
+    static {
+        YAML_MAPPER = new ObjectMapper(new YAMLFactory());
+        YAML_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+        YAML_MAPPER.configure(DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL, true);
+        YAML_MAPPER.setSerializationInclusion(Include.NON_NULL);
+    }
+
     private final File pluginDirectory;
+    private final String osName;
+
     /**
      * pluginName -> pluginDefinition
      */
     private Map<String, PluginDefinition> pluginDefinitionMap = new HashMap<>();
-    private ObjectMapper yamlMapper;
-    private String osName;
 
     private PluginClassLoader(URL url, ClassLoader parent, String osName) throws IOException {
         super(new URL[]{url}, parent);
         this.pluginDirectory = new File(url.getPath());
         this.osName = osName;
-        initYamlMapper();
         loadPluginDefinition();
     }
 
@@ -120,14 +129,6 @@ public class PluginClassLoader extends URLClassLoader {
         return this.pluginDefinitionMap;
     }
 
-    private void initYamlMapper() {
-        ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
-        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-        mapper.configure(DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL, true);
-        mapper.setSerializationInclusion(Include.NON_NULL);
-        this.yamlMapper = mapper;
-    }
-
     /**
      * load pluginDefinition in **.jar/META-INF/plugin.yaml
      */
@@ -141,46 +142,46 @@ public class PluginClassLoader extends URLClassLoader {
         List<PluginDefinition> definitions = new ArrayList<>();
         for (File jarFile : files) {
             if (!jarFile.getName().endsWith(".jar")) {
-                log.warn("{} is not valid plugin jar, skip to load", jarFile);
+                log.warn("invalid plugin jar {}, skip to load", jarFile);
                 continue;
             }
-            log.info("{} is valid plugin jar, start to load", jarFile);
+
+            log.info("start to load valid plugin jar {}", jarFile);
             JarFile pluginJar = new JarFile(jarFile);
             String pluginDef = readPluginDef(pluginJar);
             pluginDef = pluginDef.replaceAll("[\\x00]+", "");
-            PluginDefinition definition = yamlMapper.readValue(pluginDef, PluginDefinition.class);
+            PluginDefinition definition = YAML_MAPPER.readValue(pluginDef, PluginDefinition.class);
+
             if (osName.startsWith(WINDOWS_PREFIX)) {
-                addURL(new URL("file:///" + jarFile.getAbsolutePath()));
+                super.addURL(new URL("file:///" + jarFile.getAbsolutePath()));
             } else {
-                addURL(new URL("file://" + jarFile.getAbsolutePath()));
+                super.addURL(new URL("file://" + jarFile.getAbsolutePath()));
             }
             checkPluginValid(jarFile, definition);
             definitions.add(definition);
         }
+
         pluginDefinitionMap = definitions.stream()
                 .collect(Collectors.toMap(PluginDefinition::getName, definition -> definition));
     }
 
     private void checkPluginValid(File jarFile, PluginDefinition pluginDefinition) {
-        if (StringUtils.isEmpty(pluginDefinition.getName())) {
-            throw new RuntimeException(String.format("%s should define pluginName in plugin.yaml", jarFile.getName()));
-        }
-        if (StringUtils.isEmpty(pluginDefinition.getPluginClass())) {
-            throw new RuntimeException(String.format("%s should define pluginClass in plugin.yaml", jarFile.getName()));
-        }
-        try {
-            this.loadClass(pluginDefinition.getPluginClass());
-        } catch (ClassNotFoundException e) {
-            throw new RuntimeException(
-                    String.format("pluginClass '%s' could not be found in %s", pluginDefinition.getPluginClass(),
-                            jarFile.getName()));
-        }
+        String info = "[%s] not defined in plugin.yaml for " + jarFile.getName();
+        Preconditions.checkNotEmpty(pluginDefinition.getName(), String.format(info, "name"));
+        Preconditions.checkNotEmpty(pluginDefinition.getJavaVersion(), String.format(info, "javaVersion"));
+        Preconditions.checkNotEmpty(pluginDefinition.getPluginClasses(), String.format(info, "pluginClasses"));
         if (StringUtils.isEmpty(pluginDefinition.getDescription())) {
-            log.warn(String.format("%s should define description in plugin.yaml", jarFile.getName()));
+            log.warn(String.format(info, "description"));
         }
-        if (StringUtils.isEmpty(pluginDefinition.getJavaVersion())) {
-            throw new RuntimeException(String.format("%s should define javaVersion in plugin.yaml", jarFile.getName()));
+
+        for (String clazz : pluginDefinition.getPluginClasses()) {
+            try {
+                this.loadClass(clazz);
+            } catch (ClassNotFoundException e) {
+                throw new RuntimeException(String.format("plugin class %s not found in %s", clazz, jarFile.getName()));
+            }
         }
+
         JarHell.checkJavaVersion(pluginDefinition.getName(), pluginDefinition.getJavaVersion());
     }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/plugin/PluginService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/plugin/PluginService.java
index 9014304cc..1e23f6f76 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/plugin/PluginService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/plugin/PluginService.java
@@ -22,9 +22,10 @@ import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.workflow.plugin.Plugin;
-import org.apache.inlong.manager.workflow.plugin.PluginBinder;
-import org.apache.inlong.manager.workflow.plugin.PluginDefinition;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.plugin.Plugin;
+import org.apache.inlong.manager.common.plugin.PluginBinder;
+import org.apache.inlong.manager.common.plugin.PluginDefinition;
 import org.springframework.beans.factory.InitializingBean;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
@@ -47,8 +48,7 @@ import java.util.Map;
 @Order(Ordered.LOWEST_PRECEDENCE)
 public class PluginService implements InitializingBean {
 
-
-    public static final String DEFAULT_PLUGIN_LOC = "plugins";
+    public static final String DEFAULT_PLUGIN_LOCATION = "plugins";
 
     @Getter
     private final List<Plugin> plugins = new ArrayList<>();
@@ -56,7 +56,7 @@ public class PluginService implements InitializingBean {
     @Setter
     @Getter
     @Value("${plugin.location?:}")
-    private String pluginLoc;
+    private String pluginLocation;
 
     @Getter
     @Autowired
@@ -67,10 +67,10 @@ public class PluginService implements InitializingBean {
 
     @Override
     public void afterPropertiesSet() throws Exception {
-        if (StringUtils.isEmpty(pluginLoc)) {
-            pluginLoc = DEFAULT_PLUGIN_LOC;
+        if (StringUtils.isEmpty(pluginLocation)) {
+            pluginLocation = DEFAULT_PLUGIN_LOCATION;
         }
-        log.info("pluginLoc:{}", pluginLoc);
+        log.info("plugin location is {}", pluginLocation);
         pluginReload();
     }
 
@@ -78,36 +78,41 @@ public class PluginService implements InitializingBean {
      * Reload the plugin from the plugin path
      */
     public void pluginReload() {
-        Path path = Paths.get(pluginLoc).toAbsolutePath();
+        Path path = Paths.get(pluginLocation).toAbsolutePath();
         log.info("search for plugin in {}", path);
         if (!path.toFile().exists()) {
             log.warn("plugin directory not found");
             return;
         }
+
         PluginClassLoader pluginLoader = PluginClassLoader.getFromPluginUrl(path.toString(),
                 Thread.currentThread().getContextClassLoader());
         Map<String, PluginDefinition> pluginDefinitions = pluginLoader.getPluginDefinitions();
         if (MapUtils.isEmpty(pluginDefinitions)) {
-            log.warn("pluginDefinition not found in {}", pluginLoc);
+            log.warn("plugin definition not found in {}", pluginLocation);
             return;
         }
+
         List<Plugin> plugins = new ArrayList<>();
         for (PluginDefinition pluginDefinition : pluginDefinitions.values()) {
-            String pluginClassName = pluginDefinition.getPluginClass();
-            try {
-                Class<?> pluginClass = pluginLoader.loadClass(pluginClassName);
-                Object plugin = pluginClass.getDeclaredConstructor().newInstance();
-                plugins.add((Plugin) plugin);
-            } catch (Throwable e) {
-                throw new RuntimeException(e.getMessage());
+            List<String> classNames = pluginDefinition.getPluginClasses();
+            for (String name : classNames) {
+                try {
+                    Class<?> pluginClass = pluginLoader.loadClass(name);
+                    Object plugin = pluginClass.getDeclaredConstructor().newInstance();
+                    plugins.add((Plugin) plugin);
+                } catch (Throwable e) {
+                    log.error("create plugin instance error: ", e);
+                    throw new BusinessException("create plugin instance error: " + e.getMessage());
+                }
             }
         }
         this.plugins.addAll(plugins);
-        for (PluginBinder pluginBinder : pluginBinders) {
+
+        for (PluginBinder binder : pluginBinders) {
             for (Plugin plugin : plugins) {
-                log.info(String.format("PluginBinder:%s load Plugin:%s",
-                        pluginBinder.getClass().getSimpleName(), plugin.getClass().getSimpleName()));
-                pluginBinder.acceptPlugin(plugin);
+                binder.acceptPlugin(plugin);
+                log.info("plugin {} loaded by plugin binder {}", plugin.getClass(), binder.getClass());
             }
         }
     }
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/plugin/PluginClassLoaderTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/plugin/PluginClassLoaderTest.java
index adf2891c4..007805050 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/plugin/PluginClassLoaderTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/plugin/PluginClassLoaderTest.java
@@ -18,13 +18,15 @@
 package org.apache.inlong.manager.service.plugin;
 
 import com.google.common.collect.Lists;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.workflow.plugin.Plugin;
-import org.apache.inlong.manager.workflow.plugin.PluginDefinition;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.inlong.manager.common.plugin.Plugin;
+import org.apache.inlong.manager.common.plugin.PluginDefinition;
 import org.apache.inlong.manager.workflow.plugin.ProcessPlugin;
+import org.apache.inlong.manager.workflow.plugin.sort.PollerPlugin;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
@@ -33,25 +35,33 @@ import java.util.Objects;
  */
 public class PluginClassLoaderTest {
 
+    /**
+     * The test plugin jar was packaged from manager-plugins,
+     * naming it to `manager-plugin-example.jar`.
+     */
     @Test
     public void testLoadPlugin() {
-
         String path = Objects.requireNonNull(this.getClass().getClassLoader().getResource("")).getPath();
         PluginClassLoader pluginClassLoader = PluginClassLoader.getFromPluginUrl(path + "plugins",
                 Thread.currentThread().getContextClassLoader());
         Map<String, PluginDefinition> pluginDefinitionMap = pluginClassLoader.getPluginDefinitions();
         Assertions.assertEquals(1, pluginDefinitionMap.size());
+
         PluginDefinition pluginDefinition = Lists.newArrayList(pluginDefinitionMap.values()).get(0);
         Assertions.assertNotNull(pluginDefinition);
-        String pluginClass = pluginDefinition.getPluginClass();
-        Assertions.assertTrue(StringUtils.isNotEmpty(pluginClass));
-        try {
-            Class cls = pluginClassLoader.loadClass(pluginClass);
-            Plugin plugin = (Plugin) cls.getDeclaredConstructor().newInstance();
-            Assertions.assertTrue(plugin instanceof ProcessPlugin);
-        } catch (Exception e) {
-            Assertions.assertTrue(e instanceof ClassNotFoundException);
-            Assertions.fail();
+        List<String> classNames = pluginDefinition.getPluginClasses();
+        Assertions.assertTrue(CollectionUtils.isNotEmpty(classNames));
+
+        for (String name : classNames) {
+            try {
+                Class<?> cls = pluginClassLoader.loadClass(name);
+                Plugin plugin = (Plugin) cls.getDeclaredConstructor().newInstance();
+                Assertions.assertTrue(plugin instanceof ProcessPlugin
+                        || plugin instanceof PollerPlugin);
+            } catch (Exception e) {
+                Assertions.assertTrue(e instanceof ClassNotFoundException);
+                Assertions.fail();
+            }
         }
     }
 
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/plugin/PluginServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/plugin/PluginServiceTest.java
index c186e6cbd..684d74427 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/plugin/PluginServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/plugin/PluginServiceTest.java
@@ -17,17 +17,18 @@
 
 package org.apache.inlong.manager.service.plugin;
 
+import org.apache.inlong.manager.common.plugin.Plugin;
 import org.apache.inlong.manager.service.ServiceBaseTest;
-import org.apache.inlong.manager.workflow.plugin.Plugin;
-import org.apache.inlong.manager.workflow.plugin.ProcessPlugin;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.springframework.beans.factory.annotation.Autowired;
 
 import java.io.File;
+import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.file.Paths;
 import java.util.List;
+import java.util.Objects;
 
 /**
  * Test class for reload plugin.
@@ -43,15 +44,15 @@ public class PluginServiceTest extends ServiceBaseTest {
     public void testReloadPlugin() {
         String path = null;
         try {
-            path = Paths.get(this.getClass().getClassLoader().getResource("").toURI()).toString();
+            URI uri = Objects.requireNonNull(this.getClass().getClassLoader().getResource("")).toURI();
+            path = Paths.get(uri).toString();
         } catch (URISyntaxException e) {
             Assertions.fail(e.getMessage());
         }
-        pluginService.setPluginLoc(path + File.separator + PLUGIN_NAME);
+        pluginService.setPluginLocation(path + File.separator + PLUGIN_NAME);
         pluginService.pluginReload();
         List<Plugin> pluginList = pluginService.getPlugins();
         Assertions.assertTrue(pluginList.size() > 0);
-        Assertions.assertTrue(pluginList.get(0) instanceof ProcessPlugin);
     }
 
 }
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/DisableZkForSortTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/DisableZkForSortTest.java
index bdb5a7a50..d671c9b9b 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/DisableZkForSortTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/DisableZkForSortTest.java
@@ -19,9 +19,10 @@ package org.apache.inlong.manager.service.sort;
 
 import com.google.common.collect.Lists;
 import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.consts.MQType;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.enums.GroupStatus;
-import org.apache.inlong.manager.common.consts.MQType;
+import org.apache.inlong.manager.common.enums.ProcessName;
 import org.apache.inlong.manager.common.enums.ProcessStatus;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.sink.SinkField;
@@ -32,11 +33,10 @@ import org.apache.inlong.manager.pojo.workflow.ProcessResponse;
 import org.apache.inlong.manager.pojo.workflow.WorkflowResult;
 import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
 import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
-import org.apache.inlong.manager.service.stream.InlongStreamService;
 import org.apache.inlong.manager.service.mocks.MockPlugin;
 import org.apache.inlong.manager.service.sink.StreamSinkService;
 import org.apache.inlong.manager.service.source.StreamSourceService;
-import org.apache.inlong.manager.common.enums.ProcessName;
+import org.apache.inlong.manager.service.stream.InlongStreamService;
 import org.apache.inlong.manager.service.workflow.WorkflowServiceImplTest;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.definition.ServiceTask;
@@ -57,7 +57,6 @@ import java.util.stream.Collectors;
  */
 public class DisableZkForSortTest extends WorkflowServiceImplTest {
 
-
     @Autowired
     protected InlongStreamService streamService;
 
diff --git a/inlong-manager/manager-service/src/test/resources/application.properties b/inlong-manager/manager-service/src/test/resources/application.properties
index 1512ce7e8..b16c38c79 100644
--- a/inlong-manager/manager-service/src/test/resources/application.properties
+++ b/inlong-manager/manager-service/src/test/resources/application.properties
@@ -15,7 +15,6 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-#
 
 # Encryption config, the suffix of value must be the same as the version.
 inlong.encrypt.version=1
diff --git a/inlong-manager/manager-service/src/test/resources/plugins/manager-plugin-example.jar b/inlong-manager/manager-service/src/test/resources/plugins/manager-plugin-example.jar
index 841b1189f..fa2aaf612 100644
Binary files a/inlong-manager/manager-service/src/test/resources/plugins/manager-plugin-example.jar and b/inlong-manager/manager-service/src/test/resources/plugins/manager-plugin-example.jar differ
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongSortController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongSortController.java
index 7b0a39666..95db04f03 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongSortController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongSortController.java
@@ -20,15 +20,17 @@ package org.apache.inlong.manager.web.controller;
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import org.apache.inlong.manager.pojo.common.Response;
-import org.apache.inlong.manager.pojo.sort.ListSortStatusRequest;
-import org.apache.inlong.manager.pojo.sort.ListSortStatusResponse;
+import org.apache.inlong.manager.pojo.sort.SortStatusRequest;
+import org.apache.inlong.manager.pojo.sort.SortStatusInfo;
 import org.apache.inlong.manager.service.core.SortService;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.RequestMethod;
 import org.springframework.web.bind.annotation.RestController;
 
+import java.util.List;
+
 /**
  * Inlong sort controller.
  */
@@ -40,9 +42,9 @@ public class InlongSortController {
     @Autowired
     private SortService sortService;
 
-    @RequestMapping(value = "/sort/listStatus", method = RequestMethod.POST)
-    @ApiOperation(value = "List sort job status by inlong groups")
-    public Response<ListSortStatusResponse> listSortStatus(@RequestBody ListSortStatusRequest request) {
+    @PostMapping(value = "/sort/listStatus")
+    @ApiOperation(value = "List sort job status")
+    public Response<List<SortStatusInfo>> listSortStatus(@RequestBody SortStatusRequest request) {
         return Response.success(sortService.listSortStatus(request));
     }
 
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/ProcessPlugin.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/ProcessPlugin.java
index efbae225b..7e15d5276 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/ProcessPlugin.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/ProcessPlugin.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.workflow.plugin;
 
+import org.apache.inlong.manager.common.plugin.Plugin;
 import org.apache.inlong.manager.workflow.event.task.QueueOperateListener;
 import org.apache.inlong.manager.workflow.event.task.SinkOperateListener;
 import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
@@ -45,8 +46,4 @@ public interface ProcessPlugin extends Plugin {
         return null;
     }
 
-    default SortStatusPoller createSortStatusPoller() {
-        return null;
-    }
-
 }
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/SortStatusPoller.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/sort/PollerPlugin.java
similarity index 67%
copy from inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/SortStatusPoller.java
copy to inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/sort/PollerPlugin.java
index 295fbcff6..a59ed0060 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/SortStatusPoller.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/sort/PollerPlugin.java
@@ -15,17 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.workflow.plugin;
+package org.apache.inlong.manager.workflow.plugin.sort;
 
-import org.apache.inlong.manager.common.enums.SortStatus;
-import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
-
-import java.util.List;
-import java.util.Map;
+import org.apache.inlong.manager.common.plugin.Plugin;
 
 /**
- * Sort task status poller interface for inlong groups
+ * Interface of Poller Plugin.
  */
-public interface SortStatusPoller {
-    Map<String, SortStatus> poll(List<InlongGroupInfo> groupInfo, String credentials);
+public interface PollerPlugin extends Plugin {
+
+    /**
+     * Get Sort poller instance.
+     *
+     * @return Sort poller instance
+     */
+    default SortPoller getSortPoller() {
+        return null;
+    }
+
 }
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/SortStatusPoller.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/sort/SortPoller.java
similarity index 61%
rename from inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/SortStatusPoller.java
rename to inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/sort/SortPoller.java
index 295fbcff6..68dd24285 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/SortStatusPoller.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/sort/SortPoller.java
@@ -15,17 +15,26 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.workflow.plugin;
+package org.apache.inlong.manager.workflow.plugin.sort;
 
-import org.apache.inlong.manager.common.enums.SortStatus;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.pojo.sort.SortStatusInfo;
 
 import java.util.List;
-import java.util.Map;
 
 /**
- * Sort task status poller interface for inlong groups
+ * Plugin poller interface for extension
  */
-public interface SortStatusPoller {
-    Map<String, SortStatus> poll(List<InlongGroupInfo> groupInfo, String credentials);
+public interface SortPoller {
+
+    /**
+     * Poll the Sort status infos by the given inlong groups
+     *
+     * @param groupInfos inlong group infos
+     * @param credentials credential info
+     * @return list of Sort status infos
+     * @throws Exception any exception if occurred
+     */
+    List<SortStatusInfo> pollSortStatus(List<InlongGroupInfo> groupInfos, String credentials) throws Exception;
+
 }