You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/10/26 09:13:15 UTC

[inlong] branch master updated: [INLONG-6248][Manager] Add Sort status query plugin interface and optimize status logic (#6249)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new d885af128 [INLONG-6248][Manager] Add Sort status query plugin interface and optimize status logic (#6249)
d885af128 is described below

commit d885af1280e2ff3c8e653a9b9aa11e35e7ba9184
Author: woofyzhao <49...@qq.com>
AuthorDate: Wed Oct 26 17:13:09 2022 +0800

    [INLONG-6248][Manager] Add Sort status query plugin interface and optimize status logic (#6249)
---
 .../inlong/manager/client/api/InlongClient.java    |  10 ++
 .../inlong/manager/client/api/InlongGroup.java     |   9 ++
 .../manager/client/api/InlongGroupContext.java     |   9 ++
 .../manager/client/api/impl/BlankInlongGroup.java  |   5 +
 .../manager/client/api/impl/InlongClientImpl.java  |  21 ++++
 .../manager/client/api/impl/InlongGroupImpl.java   |  28 ++++++
 .../client/api/inner/client/InlongGroupClient.java |  18 ++++
 .../manager/client/api/service/InlongSortApi.java  |  32 ++++++
 .../inlong/manager/common/enums/SortStatus.java    |  55 +++++++++++
 .../manager/plugin/FlinkSortProcessPlugin.java     |   7 ++
 .../manager/plugin/poller/FlinkStatusPoller.java   | 110 +++++++++++++++++++++
 .../manager/pojo/group/InlongGroupStatusInfo.java  |   4 +
 .../ListSortStatusRequest.java}                    |  30 ++----
 .../ListSortStatusResponse.java}                   |  26 ++---
 .../inlong/manager/service/core/SortService.java   |  10 ++
 .../manager/service/core/impl/SortServiceImpl.java |  43 +++++++-
 .../service/source/AbstractSourceOperator.java     |  38 ++++---
 .../service/source/StreamSourceOperator.java       |   2 +-
 .../service/source/StreamSourceServiceImpl.java    |   2 +-
 .../web/controller/InlongSortController.java       |  49 +++++++++
 .../manager/workflow/plugin/ProcessPlugin.java     |   4 +
 .../{ProcessPlugin.java => SortStatusPoller.java}  |  29 ++----
 22 files changed, 458 insertions(+), 83 deletions(-)

diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java
index 9f6123789..d65660f3e 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java
@@ -104,6 +104,16 @@ public interface InlongClient {
      */
     Map<String, InlongGroupStatusInfo> listGroupStatus(List<String> groupIds) throws Exception;
 
+    /**
+     * List group status
+     *
+     * @param groupIds inlong group id list
+     * @param credentials auth info to query sort task such as sort cluster token
+     * @return map of inlong group status list
+     * @throws Exception the exception
+     */
+    Map<String, InlongGroupStatusInfo> listGroupStatus(List<String> groupIds, String credentials) throws Exception;
+
     /**
      * Gets group.
      *
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroup.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroup.java
index 67bae1cfe..ac4b36eeb 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroup.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroup.java
@@ -43,6 +43,15 @@ public interface InlongGroup {
      */
     InlongGroupContext context() throws Exception;
 
+    /**
+     * Create snapshot for Inlong group with credential info such as sort token
+     *
+     * @param credentials credential info such as sort token
+     * @return inlong group context
+     * @throws Exception the exception
+     */
+    InlongGroupContext context(String credentials) throws Exception;
+
     /**
      * Init inlong group.
      * This operation will init all physical resources needed to start a stream group
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java
index 52cca26a7..e151c59e6 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java
@@ -26,6 +26,7 @@ import org.apache.commons.collections.MapUtils;
 import org.apache.inlong.manager.common.enums.SimpleGroupStatus;
 import org.apache.inlong.manager.common.enums.SimpleSourceStatus;
 import org.apache.inlong.manager.client.api.inner.InnerGroupContext;
+import org.apache.inlong.manager.common.enums.SortStatus;
 import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupStatusInfo;
@@ -58,6 +59,8 @@ public class InlongGroupContext implements Serializable {
 
     private SimpleGroupStatus status;
 
+    private SortStatus sortStatus = SortStatus.UNKNOWN;
+
     private InlongGroupStatusInfo statusInfo;
 
     public InlongGroupContext(InnerGroupContext groupContext) {
@@ -73,6 +76,7 @@ public class InlongGroupContext implements Serializable {
                 .inlongGroupId(groupInfo.getInlongGroupId())
                 .originalStatus(groupInfo.getStatus())
                 .simpleGroupStatus(this.status)
+                .sortStatus(this.sortStatus)
                 .streamSources(getGroupSources()).build();
         this.extensions = Maps.newHashMap();
         List<InlongGroupExtInfo> extInfos = groupInfo.getExtList();
@@ -83,6 +87,11 @@ public class InlongGroupContext implements Serializable {
         }
     }
 
+    public void updateSortStatus(SortStatus sortStatus) {
+        this.sortStatus = sortStatus;
+        this.statusInfo.setSortStatus(sortStatus);
+    }
+
     private List<StreamSource> getGroupSources() {
         List<StreamSource> groupSources = Lists.newArrayList();
         this.inlongStreamMap.values().forEach(inlongStream -> {
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/BlankInlongGroup.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/BlankInlongGroup.java
index f119e0915..1b47eee73 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/BlankInlongGroup.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/BlankInlongGroup.java
@@ -44,6 +44,11 @@ public class BlankInlongGroup implements InlongGroup {
         throw new UnsupportedOperationException("Inlong group is not exists");
     }
 
+    @Override
+    public InlongGroupContext context(String credentials) throws Exception {
+        throw new UnsupportedOperationException("Inlong group is not exists");
+    }
+
     @Override
     public InlongGroupContext init() throws Exception {
         throw new UnsupportedOperationException("Inlong group is not exists");
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
index 715c21775..1a6c04c20 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
@@ -34,6 +34,7 @@ import org.apache.inlong.manager.client.api.inner.client.InlongGroupClient;
 import org.apache.inlong.manager.client.api.util.ClientUtils;
 import org.apache.inlong.manager.common.enums.SimpleGroupStatus;
 import org.apache.inlong.manager.common.enums.SimpleSourceStatus;
+import org.apache.inlong.manager.common.enums.SortStatus;
 import org.apache.inlong.manager.common.util.HttpUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.pojo.cluster.BindTagRequest;
@@ -50,6 +51,7 @@ import org.apache.inlong.manager.pojo.group.InlongGroupBriefInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupPageRequest;
 import org.apache.inlong.manager.pojo.group.InlongGroupStatusInfo;
+import org.apache.inlong.manager.pojo.sort.ListSortStatusRequest;
 import org.apache.inlong.manager.pojo.source.StreamSource;
 
 import java.util.List;
@@ -129,6 +131,7 @@ public class InlongClientImpl implements InlongClient {
 
         PageResult<InlongGroupBriefInfo> pageInfo = groupClient.listGroups(request);
         List<InlongGroupBriefInfo> briefInfos = pageInfo.getList();
+
         Map<String, InlongGroupStatusInfo> groupStatusMap = Maps.newHashMap();
         if (CollectionUtils.isNotEmpty(briefInfos)) {
             briefInfos.forEach(briefInfo -> {
@@ -147,6 +150,24 @@ public class InlongClientImpl implements InlongClient {
         return groupStatusMap;
     }
 
+    @Override
+    public Map<String, InlongGroupStatusInfo> listGroupStatus(List<String> groupIds, String credentials) {
+        Map<String, InlongGroupStatusInfo> statusInfoMap = listGroupStatus(groupIds);
+
+        // sort status info
+        ListSortStatusRequest sortStatusRequest = new ListSortStatusRequest();
+        sortStatusRequest.setInlongGroupIds(groupIds);
+        sortStatusRequest.setCredentials(credentials);
+        Map<String, SortStatus> sortStatusMap = groupClient.listSortStatus(sortStatusRequest).getStatusMap();
+
+        if (MapUtils.isNotEmpty(sortStatusMap)) {
+            statusInfoMap.forEach((groupId, statusInfo) -> {
+                statusInfo.setSortStatus(sortStatusMap.getOrDefault(groupId, SortStatus.UNKNOWN));
+            });
+        }
+        return statusInfoMap;
+    }
+
     @Override
     public InlongGroup getGroup(String groupId) {
         InlongGroupInfo groupInfo = groupClient.getGroupInfo(groupId);
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
index df65134b4..f7c4fb656 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
@@ -19,6 +19,8 @@ package org.apache.inlong.manager.client.api.impl;
 
 import com.google.common.base.Objects;
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.inlong.manager.client.api.ClientConfiguration;
 import org.apache.inlong.manager.client.api.InlongGroup;
@@ -32,9 +34,11 @@ import org.apache.inlong.manager.client.api.inner.client.InlongStreamClient;
 import org.apache.inlong.manager.client.api.inner.client.WorkflowClient;
 import org.apache.inlong.manager.client.api.util.ClientUtils;
 import org.apache.inlong.manager.client.api.util.InlongGroupTransfer;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.GroupStatus;
 import org.apache.inlong.manager.common.enums.ProcessStatus;
 import org.apache.inlong.manager.common.enums.SimpleGroupStatus;
+import org.apache.inlong.manager.common.enums.SortStatus;
 import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.pojo.group.InlongGroupCountResponse;
@@ -42,6 +46,8 @@ import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupResetRequest;
 import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
 import org.apache.inlong.manager.pojo.sort.BaseSortConf;
+import org.apache.inlong.manager.pojo.sort.ListSortStatusRequest;
+import org.apache.inlong.manager.pojo.sort.ListSortStatusResponse;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.pojo.workflow.ProcessResponse;
 import org.apache.inlong.manager.pojo.workflow.TaskResponse;
@@ -49,6 +55,7 @@ import org.apache.inlong.manager.pojo.workflow.WorkflowResult;
 import org.apache.inlong.manager.pojo.workflow.form.process.ApplyGroupProcessForm;
 import org.springframework.boot.configurationprocessor.json.JSONObject;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -102,6 +109,10 @@ public class InlongGroupImpl implements InlongGroup {
         return generateSnapshot();
     }
 
+    public InlongGroupContext context(String credentials) throws Exception {
+        return generateSnapshot(credentials);
+    }
+
     @Override
     public InlongGroupContext init() throws Exception {
         InlongGroupInfo groupInfo = this.groupContext.getGroupInfo();
@@ -280,6 +291,23 @@ public class InlongGroupImpl implements InlongGroup {
         return new InlongGroupContext(groupContext);
     }
 
+    private InlongGroupContext generateSnapshot(String credentials) {
+        InlongGroupContext groupContext = generateSnapshot();
+        InlongGroupInfo groupInfo = groupContext.getGroupInfo();
+        if (groupInfo.getExtList().stream().anyMatch(info -> InlongConstants.SORT_JOB_ID.equals(info.getKeyName())
+                && StringUtils.isNotEmpty(info.getKeyValue()))) {
+            ListSortStatusRequest request = new ListSortStatusRequest();
+            request.setInlongGroupIds(Collections.singletonList(groupInfo.getInlongGroupId()));
+            request.setCredentials(credentials);
+            ListSortStatusResponse sortStatusInfo = groupClient.listSortStatus(request);
+            if (MapUtils.isNotEmpty(sortStatusInfo.getStatusMap())) {
+                groupContext.updateSortStatus(sortStatusInfo.getStatusMap().getOrDefault(
+                        groupInfo.getInlongGroupId(), SortStatus.UNKNOWN));
+            }
+        }
+        return groupContext;
+    }
+
     private List<InlongStream> fetchInlongStreams(String groupId) {
         List<InlongStreamInfo> streamInfos = streamClient.listStreamInfo(groupId);
         if (CollectionUtils.isEmpty(streamInfos)) {
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java
index 64daa8d38..6aba39848 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java
@@ -21,6 +21,7 @@ import lombok.SneakyThrows;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.inlong.manager.client.api.ClientConfiguration;
 import org.apache.inlong.manager.client.api.service.InlongGroupApi;
+import org.apache.inlong.manager.client.api.service.InlongSortApi;
 import org.apache.inlong.manager.client.api.util.ClientUtils;
 import org.apache.inlong.manager.common.enums.SimpleGroupStatus;
 import org.apache.inlong.manager.common.util.JsonUtils;
@@ -34,6 +35,8 @@ import org.apache.inlong.manager.pojo.group.InlongGroupPageRequest;
 import org.apache.inlong.manager.pojo.group.InlongGroupRequest;
 import org.apache.inlong.manager.pojo.group.InlongGroupResetRequest;
 import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
+import org.apache.inlong.manager.pojo.sort.ListSortStatusRequest;
+import org.apache.inlong.manager.pojo.sort.ListSortStatusResponse;
 import org.apache.inlong.manager.pojo.workflow.WorkflowResult;
 import org.springframework.boot.configurationprocessor.json.JSONObject;
 import retrofit2.Call;
@@ -47,9 +50,11 @@ import static org.apache.inlong.manager.client.api.impl.InlongGroupImpl.MQ_FIELD
 public class InlongGroupClient {
 
     private final InlongGroupApi inlongGroupApi;
+    private final InlongSortApi inlongSortApi;
 
     public InlongGroupClient(ClientConfiguration configuration) {
         inlongGroupApi = ClientUtils.createRetrofit(configuration).create(InlongGroupApi.class);
+        inlongSortApi = ClientUtils.createRetrofit(configuration).create(InlongSortApi.class);
     }
 
     /**
@@ -138,6 +143,19 @@ public class InlongGroupClient {
         return response.getData();
     }
 
+    /**
+     * List sort task status for inlong groups
+     *
+     * @param request sort status request
+     * @return Response encapsulate of group id to sort status map
+     */
+    public ListSortStatusResponse listSortStatus(ListSortStatusRequest request) {
+        Response<ListSortStatusResponse> response = ClientUtils.executeHttpCall(
+                inlongSortApi.listSortStatus(request));
+        ClientUtils.assertRespSuccess(response);
+        return response.getData();
+    }
+
     /**
      * Create an inlong group
      */
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongSortApi.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongSortApi.java
new file mode 100644
index 000000000..fc0ce068f
--- /dev/null
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongSortApi.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api.service;
+
+import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.pojo.sort.ListSortStatusRequest;
+import org.apache.inlong.manager.pojo.sort.ListSortStatusResponse;
+import retrofit2.Call;
+import retrofit2.http.Body;
+import retrofit2.http.POST;
+
+public interface InlongSortApi {
+
+    @POST("sort/listStatus")
+    Call<Response<ListSortStatusResponse>> listSortStatus(@Body ListSortStatusRequest request);
+
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SortStatus.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SortStatus.java
new file mode 100644
index 000000000..c33f2e8e7
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SortStatus.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.enums;
+
+import com.fasterxml.jackson.annotation.JsonValue;
+
+/**
+ * Sort task status enum
+ */
+public enum SortStatus {
+
+    NOT_EXISTS(40, "job not exists"),
+    NEW(100, "job not started: draft, pending to run, etc"),
+    RUNNING(110, "job is running"),
+    PAUSED(120, "job is paused"),
+    STOPPED(130, "job has stopped without error, e.g canceled"),
+    FAILED(140, "job failed with error"),
+    FINISHED(200, "job finished successfully"),
+    OPERATING(300, "job in intermediate state such as restarting, canceling, etc"),
+    UNKNOWN(400, "job status unknown")
+    ;
+
+    @JsonValue
+    private final Integer code;
+    private final String description;
+
+    SortStatus(Integer code, String description) {
+        this.code = code;
+        this.description = description;
+    }
+
+    public Integer getCode() {
+        return code;
+    }
+
+    public String getDescription() {
+        return description;
+    }
+
+}
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/FlinkSortProcessPlugin.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/FlinkSortProcessPlugin.java
index 896b770d4..be269c1ec 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/FlinkSortProcessPlugin.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/FlinkSortProcessPlugin.java
@@ -26,6 +26,8 @@ import org.apache.inlong.manager.plugin.listener.StartupSortListener;
 import org.apache.inlong.manager.plugin.listener.StartupStreamListener;
 import org.apache.inlong.manager.plugin.listener.SuspendSortListener;
 import org.apache.inlong.manager.plugin.listener.SuspendStreamListener;
+import org.apache.inlong.manager.plugin.poller.FlinkStatusPoller;
+import org.apache.inlong.manager.workflow.plugin.SortStatusPoller;
 import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
 import org.apache.inlong.manager.workflow.event.task.SourceOperateListener;
 import org.apache.inlong.manager.workflow.plugin.ProcessPlugin;
@@ -57,4 +59,9 @@ public class FlinkSortProcessPlugin implements ProcessPlugin {
         listeners.add(new StartupStreamListener());
         return listeners;
     }
+
+    @Override
+    public SortStatusPoller createSortStatusPoller() {
+        return new FlinkStatusPoller();
+    }
 }
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/poller/FlinkStatusPoller.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/poller/FlinkStatusPoller.java
new file mode 100644
index 000000000..90efcde21
--- /dev/null
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/poller/FlinkStatusPoller.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.plugin.poller;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.enums.SortStatus;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.plugin.flink.FlinkService;
+import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
+import org.apache.inlong.manager.workflow.plugin.SortStatusPoller;
+import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Flink sort task status poller for inlong groups
+ */
+@Slf4j
+public class FlinkStatusPoller implements SortStatusPoller {
+
+    /**
+     * Poll sort task status for groups
+     * @param groupInfos group ids to poll
+     * @param credentials not used for flink
+     * @return
+     */
+    @Override
+    public Map<String, SortStatus> poll(List<InlongGroupInfo> groupInfos, String credentials) {
+        Map<String, SortStatus> statusMap = new HashMap<>();
+        for (InlongGroupInfo groupInfo : groupInfos) {
+            String groupId = groupInfo.getInlongGroupId();
+            try {
+                List<InlongGroupExtInfo> extList = groupInfo.getExtList();
+                log.debug("inlong group {} ext info: {}", groupId, extList);
+
+                Map<String, String> kvConf = new HashMap<>();
+                extList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue()));
+                String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
+                if (StringUtils.isNotEmpty(sortExt)) {
+                    Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
+                            JsonUtils.OBJECT_MAPPER.readTree(sortExt), new TypeReference<Map<String, String>>() {
+                            });
+                    kvConf.putAll(result);
+                }
+
+                String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
+                if (StringUtils.isBlank(jobId)) {
+                    statusMap.put(groupId, SortStatus.NOT_EXISTS);
+                    continue;
+                }
+
+                String sortUrl = kvConf.get(InlongConstants.SORT_URL);
+                FlinkService flinkService = new FlinkService(sortUrl);
+                SortStatus status = convertToSortStatus(flinkService.getJobStatus(jobId));
+                statusMap.put(groupId, status);
+                log.info("get sort status = {} for flink jodId = {}, sortUrl = {}", status, jobId, sortUrl);
+            } catch (Exception e) {
+                log.error("polling sort status failed for group " + groupId, e);
+                statusMap.put(groupId, SortStatus.UNKNOWN);
+            }
+        }
+        return statusMap;
+    }
+
+    private SortStatus convertToSortStatus(JobStatus jobStatus) {
+        log.debug("flink jobStatus = {}", jobStatus);
+        switch (jobStatus) {
+            case CREATED:
+            case INITIALIZING:
+                return SortStatus.NEW;
+            case RUNNING:
+                return SortStatus.RUNNING;
+            case FAILED:
+                return SortStatus.FAILED;
+            case CANCELED:
+                return SortStatus.STOPPED;
+            case SUSPENDED:
+                return SortStatus.PAUSED;
+            case FINISHED:
+                return SortStatus.FINISHED;
+            case FAILING:
+            case CANCELLING:
+            case RESTARTING:
+            case RECONCILING:
+                return SortStatus.OPERATING;
+        }
+        return SortStatus.UNKNOWN;
+    }
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupStatusInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupStatusInfo.java
index c0cf4e678..7f798d478 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupStatusInfo.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupStatusInfo.java
@@ -24,6 +24,7 @@ import lombok.Builder;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.enums.SimpleGroupStatus;
+import org.apache.inlong.manager.common.enums.SortStatus;
 import org.apache.inlong.manager.pojo.source.StreamSource;
 
 import java.util.List;
@@ -50,4 +51,7 @@ public class InlongGroupStatusInfo {
     @ApiModelProperty(value = "Stream sources in the inlong group")
     private List<StreamSource> streamSources;
 
+    @ApiModelProperty(value = "sort job status of the group")
+    private SortStatus sortStatus = SortStatus.UNKNOWN;
+
 }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupStatusInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/ListSortStatusRequest.java
similarity index 53%
copy from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupStatusInfo.java
copy to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/ListSortStatusRequest.java
index c0cf4e678..98757e224 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupStatusInfo.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/ListSortStatusRequest.java
@@ -15,39 +15,25 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.pojo.group;
+package org.apache.inlong.manager.pojo.sort;
 
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
-import lombok.AllArgsConstructor;
-import lombok.Builder;
 import lombok.Data;
-import lombok.NoArgsConstructor;
-import org.apache.inlong.manager.common.enums.SimpleGroupStatus;
-import org.apache.inlong.manager.pojo.source.StreamSource;
 
 import java.util.List;
 
 /**
- * Inlong group status info
+ * Sort status request
  */
 @Data
-@Builder
-@NoArgsConstructor
-@AllArgsConstructor
-@ApiModel("Inlong group status info")
-public class InlongGroupStatusInfo {
+@ApiModel("list sort status request")
+public class ListSortStatusRequest {
 
-    @ApiModelProperty(value = "Inlong group id")
-    private String inlongGroupId;
+    @ApiModelProperty(value = "Inlong group ids")
+    private List<String> inlongGroupIds;
 
-    @ApiModelProperty(value = "Inlong group original status")
-    private Integer originalStatus;
-
-    @ApiModelProperty(value = "Inlong group simple status")
-    private SimpleGroupStatus simpleGroupStatus;
-
-    @ApiModelProperty(value = "Stream sources in the inlong group")
-    private List<StreamSource> streamSources;
+    @ApiModelProperty(value = "Optional credential info needed for backend query, such as sort cluster token")
+    private String credentials;
 
 }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupStatusInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/ListSortStatusResponse.java
similarity index 60%
copy from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupStatusInfo.java
copy to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/ListSortStatusResponse.java
index c0cf4e678..6b32b314a 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupStatusInfo.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/ListSortStatusResponse.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.pojo.group;
+package org.apache.inlong.manager.pojo.sort;
 
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
@@ -23,31 +23,21 @@ import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
 import lombok.NoArgsConstructor;
-import org.apache.inlong.manager.common.enums.SimpleGroupStatus;
-import org.apache.inlong.manager.pojo.source.StreamSource;
+import org.apache.inlong.manager.common.enums.SortStatus;
 
-import java.util.List;
+import java.util.Map;
 
 /**
- * Inlong group status info
+ * Sort status response
  */
 @Data
 @Builder
 @NoArgsConstructor
 @AllArgsConstructor
-@ApiModel("Inlong group status info")
-public class InlongGroupStatusInfo {
+@ApiModel("list sort status response")
+public class ListSortStatusResponse {
 
-    @ApiModelProperty(value = "Inlong group id")
-    private String inlongGroupId;
-
-    @ApiModelProperty(value = "Inlong group original status")
-    private Integer originalStatus;
-
-    @ApiModelProperty(value = "Inlong group simple status")
-    private SimpleGroupStatus simpleGroupStatus;
-
-    @ApiModelProperty(value = "Stream sources in the inlong group")
-    private List<StreamSource> streamSources;
+    @ApiModelProperty(value = "group id to sort status mapping")
+    private Map<String, SortStatus> statusMap;
 
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortService.java
index 92d3ff3fe..7f24cc575 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortService.java
@@ -19,6 +19,8 @@ package org.apache.inlong.manager.service.core;
 
 import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
 import org.apache.inlong.common.pojo.sdk.SortSourceConfigResponse;
+import org.apache.inlong.manager.pojo.sort.ListSortStatusRequest;
+import org.apache.inlong.manager.pojo.sort.ListSortStatusResponse;
 
 /**
  * Sort Service
@@ -55,4 +57,12 @@ public interface SortService {
      * @return Response of sort cluster config
      */
     SortSourceConfigResponse getSourceConfig(String clusterName, String sortTaskId, String md5);
+
+    /**
+     * Get sort job status.
+     *
+     * @param request Request params like group ids and sort cluster token
+     * @return Response of corresponding sort jobs' status
+     */
+    ListSortStatusResponse listSortStatus(ListSortStatusRequest request);
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
index 0cf561fb9..b5aa0cb27 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
@@ -17,21 +17,37 @@
 
 package org.apache.inlong.manager.service.core.impl;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
 import org.apache.inlong.common.pojo.sdk.SortSourceConfigResponse;
+import org.apache.inlong.manager.common.enums.SortStatus;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.pojo.sort.ListSortStatusRequest;
+import org.apache.inlong.manager.pojo.sort.ListSortStatusResponse;
 import org.apache.inlong.manager.service.core.SortClusterService;
 import org.apache.inlong.manager.service.core.SortSourceService;
 import org.apache.inlong.manager.service.core.SortService;
+import org.apache.inlong.manager.service.group.InlongGroupService;
+import org.apache.inlong.manager.workflow.plugin.Plugin;
+import org.apache.inlong.manager.workflow.plugin.PluginBinder;
+import org.apache.inlong.manager.workflow.plugin.ProcessPlugin;
+import org.apache.inlong.manager.workflow.plugin.SortStatusPoller;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.annotation.Lazy;
 import org.springframework.stereotype.Service;
 
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
 /**
  * Sort service implementation.
  */
 @Lazy
+@Slf4j
 @Service
-public class SortServiceImpl implements SortService {
+public class SortServiceImpl implements SortService, PluginBinder {
 
     @Lazy
     @Autowired
@@ -41,9 +57,13 @@ public class SortServiceImpl implements SortService {
     @Autowired
     private SortClusterService sortClusterService;
 
+    @Autowired
+    private InlongGroupService groupService;
+
+    private SortStatusPoller sortStatusPoller;
+
     @Override
     public SortClusterResponse getClusterConfig(String clusterName, String md5) {
-
         return sortClusterService.getClusterConfig(clusterName, md5);
     }
 
@@ -51,4 +71,23 @@ public class SortServiceImpl implements SortService {
     public SortSourceConfigResponse getSourceConfig(String clusterName, String sortTaskId, String md5) {
         return sortSourceService.getSourceConfig(clusterName, sortTaskId, md5);
     }
+
+    @Override
+    public ListSortStatusResponse listSortStatus(ListSortStatusRequest request) {
+        Preconditions.checkNotNull(sortStatusPoller, "sort job status poller not initialized");
+        List<InlongGroupInfo> groupInfoList = request.getInlongGroupIds().stream()
+                .map(groupId -> groupService.get(groupId)).collect(Collectors.toList());
+        Map<String, SortStatus> statusMap = sortStatusPoller.poll(groupInfoList, request.getCredentials());
+        log.debug("get sort status map = {}", statusMap);
+        return ListSortStatusResponse.builder().statusMap(statusMap).build();
+    }
+
+    @Override
+    public void acceptPlugin(Plugin plugin) {
+        if (!(plugin instanceof ProcessPlugin)) {
+            return;
+        }
+        ProcessPlugin processPlugin = (ProcessPlugin) plugin;
+        sortStatusPoller = processPlugin.createSortStatusPoller();
+    }
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
index 0d4a3f0b3..f132cd97e 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
@@ -116,7 +116,7 @@ public abstract class AbstractSourceOperator implements StreamSourceOperator {
 
     @Override
     @Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ)
-    public void updateOpt(SourceRequest request, Integer groupStatus, String operator) {
+    public void updateOpt(SourceRequest request, Integer groupStatus, Integer groupMode, String operator) {
         StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(request.getId());
         Preconditions.checkNotNull(entity, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
 
@@ -124,7 +124,10 @@ public abstract class AbstractSourceOperator implements StreamSourceOperator {
             LOGGER.warn("auto push source {} can not be updated", entity.getSourceName());
             return;
         }
-        if (!SourceStatus.ALLOWED_UPDATE.contains(entity.getStatus())) {
+
+        boolean allowUpdate = InlongConstants.LIGHTWEIGHT_MODE.equals(groupMode)
+                || SourceStatus.ALLOWED_UPDATE.contains(entity.getStatus());
+        if (!allowUpdate) {
             throw new BusinessException(String.format("source=%s is not allowed to update, "
                     + "please wait until its changed to final status or stop / frozen / delete it firstly", entity));
         }
@@ -157,21 +160,24 @@ public abstract class AbstractSourceOperator implements StreamSourceOperator {
         setTargetEntity(request, entity);
         entity.setModifier(operator);
 
-        // re-issue task if necessary
         entity.setPreviousStatus(entity.getStatus());
-        if (GroupStatus.forCode(groupStatus).equals(GroupStatus.CONFIG_SUCCESSFUL)) {
-            entity.setStatus(SourceStatus.TO_BE_ISSUED_ADD.getCode());
-        } else {
-            switch (SourceStatus.forCode(entity.getStatus())) {
-                case SOURCE_NORMAL:
-                    entity.setStatus(SourceStatus.TO_BE_ISSUED_ADD.getCode());
-                    break;
-                case SOURCE_FAILED:
-                    entity.setStatus(SourceStatus.SOURCE_NEW.getCode());
-                    break;
-                default:
-                    // others leave it be
-                    break;
+
+        // re-issue task if necessary
+        if (InlongConstants.STANDARD_MODE.equals(groupMode)) {
+            if (GroupStatus.forCode(groupStatus).equals(GroupStatus.CONFIG_SUCCESSFUL)) {
+                entity.setStatus(SourceStatus.TO_BE_ISSUED_ADD.getCode());
+            } else {
+                switch (SourceStatus.forCode(entity.getStatus())) {
+                    case SOURCE_NORMAL:
+                        entity.setStatus(SourceStatus.TO_BE_ISSUED_ADD.getCode());
+                        break;
+                    case SOURCE_FAILED:
+                        entity.setStatus(SourceStatus.SOURCE_NEW.getCode());
+                        break;
+                    default:
+                        // others leave it be
+                        break;
+                }
             }
         }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java
index 181102561..573e30e6d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java
@@ -96,7 +96,7 @@ public interface StreamSourceOperator {
      * @param groupStatus the belongs group status
      * @param operator name of operator
      */
-    void updateOpt(SourceRequest request, Integer groupStatus, String operator);
+    void updateOpt(SourceRequest request, Integer groupStatus, Integer groupMode, String operator);
 
     /**
      * Stop the source task.
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index 4bbc65b89..2f2214ce1 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -213,7 +213,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
         if (CollectionUtils.isNotEmpty(streamFields)) {
             streamFields.forEach(streamField -> streamField.setId(null));
         }
-        sourceOperator.updateOpt(request, groupEntity.getStatus(), operator);
+        sourceOperator.updateOpt(request, groupEntity.getStatus(), groupEntity.getLightweight(), operator);
 
         LOGGER.info("success to update source info: {}", request);
         return true;
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongSortController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongSortController.java
new file mode 100644
index 000000000..7b0a39666
--- /dev/null
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongSortController.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.web.controller;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.pojo.sort.ListSortStatusRequest;
+import org.apache.inlong.manager.pojo.sort.ListSortStatusResponse;
+import org.apache.inlong.manager.service.core.SortService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RestController;
+
+/**
+ * Inlong sort controller.
+ */
+@RestController
+@RequestMapping("/api")
+@Api(tags = "Inlong-Sort-API")
+public class InlongSortController {
+
+    @Autowired
+    private SortService sortService;
+
+    @RequestMapping(value = "/sort/listStatus", method = RequestMethod.POST)
+    @ApiOperation(value = "List sort job status by inlong groups")
+    public Response<ListSortStatusResponse> listSortStatus(@RequestBody ListSortStatusRequest request) {
+        return Response.success(sortService.listSortStatus(request));
+    }
+
+}
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/ProcessPlugin.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/ProcessPlugin.java
index e771db27f..efbae225b 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/ProcessPlugin.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/ProcessPlugin.java
@@ -45,4 +45,8 @@ public interface ProcessPlugin extends Plugin {
         return null;
     }
 
+    default SortStatusPoller createSortStatusPoller() {
+        return null;
+    }
+
 }
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/ProcessPlugin.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/SortStatusPoller.java
similarity index 53%
copy from inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/ProcessPlugin.java
copy to inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/SortStatusPoller.java
index e771db27f..295fbcff6 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/ProcessPlugin.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/SortStatusPoller.java
@@ -17,32 +17,15 @@
 
 package org.apache.inlong.manager.workflow.plugin;
 
-import org.apache.inlong.manager.workflow.event.task.QueueOperateListener;
-import org.apache.inlong.manager.workflow.event.task.SinkOperateListener;
-import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
-import org.apache.inlong.manager.workflow.event.task.SourceOperateListener;
+import org.apache.inlong.manager.common.enums.SortStatus;
+import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 
 import java.util.List;
+import java.util.Map;
 
 /**
- * Interface of process plugin.
+ * Sort task status poller interface for inlong groups
  */
-public interface ProcessPlugin extends Plugin {
-
-    default List<SourceOperateListener> createSourceOperateListeners() {
-        return null;
-    }
-
-    default List<SinkOperateListener> createSinkOperateListeners() {
-        return null;
-    }
-
-    default List<QueueOperateListener> createQueueOperateListeners() {
-        return null;
-    }
-
-    default List<SortOperateListener> createSortOperateListeners() {
-        return null;
-    }
-
+public interface SortStatusPoller {
+    Map<String, SortStatus> poll(List<InlongGroupInfo> groupInfo, String credentials);
 }