You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/10/26 09:13:15 UTC
[inlong] branch master updated: [INLONG-6248][Manager] Add Sort status query plugin interface and optimize status logic (#6249)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new d885af128 [INLONG-6248][Manager] Add Sort status query plugin interface and optimize status logic (#6249)
d885af128 is described below
commit d885af1280e2ff3c8e653a9b9aa11e35e7ba9184
Author: woofyzhao <49...@qq.com>
AuthorDate: Wed Oct 26 17:13:09 2022 +0800
[INLONG-6248][Manager] Add Sort status query plugin interface and optimize status logic (#6249)
---
.../inlong/manager/client/api/InlongClient.java | 10 ++
.../inlong/manager/client/api/InlongGroup.java | 9 ++
.../manager/client/api/InlongGroupContext.java | 9 ++
.../manager/client/api/impl/BlankInlongGroup.java | 5 +
.../manager/client/api/impl/InlongClientImpl.java | 21 ++++
.../manager/client/api/impl/InlongGroupImpl.java | 28 ++++++
.../client/api/inner/client/InlongGroupClient.java | 18 ++++
.../manager/client/api/service/InlongSortApi.java | 32 ++++++
.../inlong/manager/common/enums/SortStatus.java | 55 +++++++++++
.../manager/plugin/FlinkSortProcessPlugin.java | 7 ++
.../manager/plugin/poller/FlinkStatusPoller.java | 110 +++++++++++++++++++++
.../manager/pojo/group/InlongGroupStatusInfo.java | 4 +
.../ListSortStatusRequest.java} | 30 ++----
.../ListSortStatusResponse.java} | 26 ++---
.../inlong/manager/service/core/SortService.java | 10 ++
.../manager/service/core/impl/SortServiceImpl.java | 43 +++++++-
.../service/source/AbstractSourceOperator.java | 38 ++++---
.../service/source/StreamSourceOperator.java | 2 +-
.../service/source/StreamSourceServiceImpl.java | 2 +-
.../web/controller/InlongSortController.java | 49 +++++++++
.../manager/workflow/plugin/ProcessPlugin.java | 4 +
.../{ProcessPlugin.java => SortStatusPoller.java} | 29 ++----
22 files changed, 458 insertions(+), 83 deletions(-)
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java
index 9f6123789..d65660f3e 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java
@@ -104,6 +104,16 @@ public interface InlongClient {
*/
Map<String, InlongGroupStatusInfo> listGroupStatus(List<String> groupIds) throws Exception;
+ /**
+ * List group status
+ *
+ * @param groupIds inlong group id list
+ * @param credentials auth info to query sort task such as sort cluster token
+ * @return map of inlong group status list
+ * @throws Exception the exception
+ */
+ Map<String, InlongGroupStatusInfo> listGroupStatus(List<String> groupIds, String credentials) throws Exception;
+
/**
* Gets group.
*
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroup.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroup.java
index 67bae1cfe..ac4b36eeb 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroup.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroup.java
@@ -43,6 +43,15 @@ public interface InlongGroup {
*/
InlongGroupContext context() throws Exception;
+ /**
+ * Create snapshot for Inlong group with credential info such as sort token
+ *
+ * @param credentials credential info such as sort token
+ * @return inlong group context
+ * @throws Exception the exception
+ */
+ InlongGroupContext context(String credentials) throws Exception;
+
/**
* Init inlong group.
* This operation will init all physical resources needed to start a stream group
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java
index 52cca26a7..e151c59e6 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java
@@ -26,6 +26,7 @@ import org.apache.commons.collections.MapUtils;
import org.apache.inlong.manager.common.enums.SimpleGroupStatus;
import org.apache.inlong.manager.common.enums.SimpleSourceStatus;
import org.apache.inlong.manager.client.api.inner.InnerGroupContext;
+import org.apache.inlong.manager.common.enums.SortStatus;
import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupStatusInfo;
@@ -58,6 +59,8 @@ public class InlongGroupContext implements Serializable {
private SimpleGroupStatus status;
+ private SortStatus sortStatus = SortStatus.UNKNOWN;
+
private InlongGroupStatusInfo statusInfo;
public InlongGroupContext(InnerGroupContext groupContext) {
@@ -73,6 +76,7 @@ public class InlongGroupContext implements Serializable {
.inlongGroupId(groupInfo.getInlongGroupId())
.originalStatus(groupInfo.getStatus())
.simpleGroupStatus(this.status)
+ .sortStatus(this.sortStatus)
.streamSources(getGroupSources()).build();
this.extensions = Maps.newHashMap();
List<InlongGroupExtInfo> extInfos = groupInfo.getExtList();
@@ -83,6 +87,11 @@ public class InlongGroupContext implements Serializable {
}
}
+ public void updateSortStatus(SortStatus sortStatus) {
+ this.sortStatus = sortStatus;
+ this.statusInfo.setSortStatus(sortStatus);
+ }
+
private List<StreamSource> getGroupSources() {
List<StreamSource> groupSources = Lists.newArrayList();
this.inlongStreamMap.values().forEach(inlongStream -> {
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/BlankInlongGroup.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/BlankInlongGroup.java
index f119e0915..1b47eee73 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/BlankInlongGroup.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/BlankInlongGroup.java
@@ -44,6 +44,11 @@ public class BlankInlongGroup implements InlongGroup {
throw new UnsupportedOperationException("Inlong group is not exists");
}
+ @Override
+ public InlongGroupContext context(String credentials) throws Exception {
+ throw new UnsupportedOperationException("Inlong group is not exists");
+ }
+
@Override
public InlongGroupContext init() throws Exception {
throw new UnsupportedOperationException("Inlong group is not exists");
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
index 715c21775..1a6c04c20 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
@@ -34,6 +34,7 @@ import org.apache.inlong.manager.client.api.inner.client.InlongGroupClient;
import org.apache.inlong.manager.client.api.util.ClientUtils;
import org.apache.inlong.manager.common.enums.SimpleGroupStatus;
import org.apache.inlong.manager.common.enums.SimpleSourceStatus;
+import org.apache.inlong.manager.common.enums.SortStatus;
import org.apache.inlong.manager.common.util.HttpUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.pojo.cluster.BindTagRequest;
@@ -50,6 +51,7 @@ import org.apache.inlong.manager.pojo.group.InlongGroupBriefInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupPageRequest;
import org.apache.inlong.manager.pojo.group.InlongGroupStatusInfo;
+import org.apache.inlong.manager.pojo.sort.ListSortStatusRequest;
import org.apache.inlong.manager.pojo.source.StreamSource;
import java.util.List;
@@ -129,6 +131,7 @@ public class InlongClientImpl implements InlongClient {
PageResult<InlongGroupBriefInfo> pageInfo = groupClient.listGroups(request);
List<InlongGroupBriefInfo> briefInfos = pageInfo.getList();
+
Map<String, InlongGroupStatusInfo> groupStatusMap = Maps.newHashMap();
if (CollectionUtils.isNotEmpty(briefInfos)) {
briefInfos.forEach(briefInfo -> {
@@ -147,6 +150,24 @@ public class InlongClientImpl implements InlongClient {
return groupStatusMap;
}
+ @Override
+ public Map<String, InlongGroupStatusInfo> listGroupStatus(List<String> groupIds, String credentials) {
+ Map<String, InlongGroupStatusInfo> statusInfoMap = listGroupStatus(groupIds);
+
+ // sort status info
+ ListSortStatusRequest sortStatusRequest = new ListSortStatusRequest();
+ sortStatusRequest.setInlongGroupIds(groupIds);
+ sortStatusRequest.setCredentials(credentials);
+ Map<String, SortStatus> sortStatusMap = groupClient.listSortStatus(sortStatusRequest).getStatusMap();
+
+ if (MapUtils.isNotEmpty(sortStatusMap)) {
+ statusInfoMap.forEach((groupId, statusInfo) -> {
+ statusInfo.setSortStatus(sortStatusMap.getOrDefault(groupId, SortStatus.UNKNOWN));
+ });
+ }
+ return statusInfoMap;
+ }
+
@Override
public InlongGroup getGroup(String groupId) {
InlongGroupInfo groupInfo = groupClient.getGroupInfo(groupId);
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
index df65134b4..f7c4fb656 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
@@ -19,6 +19,8 @@ package org.apache.inlong.manager.client.api.impl;
import com.google.common.base.Objects;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.inlong.manager.client.api.ClientConfiguration;
import org.apache.inlong.manager.client.api.InlongGroup;
@@ -32,9 +34,11 @@ import org.apache.inlong.manager.client.api.inner.client.InlongStreamClient;
import org.apache.inlong.manager.client.api.inner.client.WorkflowClient;
import org.apache.inlong.manager.client.api.util.ClientUtils;
import org.apache.inlong.manager.client.api.util.InlongGroupTransfer;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.ProcessStatus;
import org.apache.inlong.manager.common.enums.SimpleGroupStatus;
+import org.apache.inlong.manager.common.enums.SortStatus;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.pojo.group.InlongGroupCountResponse;
@@ -42,6 +46,8 @@ import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupResetRequest;
import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
import org.apache.inlong.manager.pojo.sort.BaseSortConf;
+import org.apache.inlong.manager.pojo.sort.ListSortStatusRequest;
+import org.apache.inlong.manager.pojo.sort.ListSortStatusResponse;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.workflow.ProcessResponse;
import org.apache.inlong.manager.pojo.workflow.TaskResponse;
@@ -49,6 +55,7 @@ import org.apache.inlong.manager.pojo.workflow.WorkflowResult;
import org.apache.inlong.manager.pojo.workflow.form.process.ApplyGroupProcessForm;
import org.springframework.boot.configurationprocessor.json.JSONObject;
+import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
@@ -102,6 +109,10 @@ public class InlongGroupImpl implements InlongGroup {
return generateSnapshot();
}
+ public InlongGroupContext context(String credentials) throws Exception {
+ return generateSnapshot(credentials);
+ }
+
@Override
public InlongGroupContext init() throws Exception {
InlongGroupInfo groupInfo = this.groupContext.getGroupInfo();
@@ -280,6 +291,23 @@ public class InlongGroupImpl implements InlongGroup {
return new InlongGroupContext(groupContext);
}
+ private InlongGroupContext generateSnapshot(String credentials) {
+ InlongGroupContext groupContext = generateSnapshot();
+ InlongGroupInfo groupInfo = groupContext.getGroupInfo();
+ if (groupInfo.getExtList().stream().anyMatch(info -> InlongConstants.SORT_JOB_ID.equals(info.getKeyName())
+ && StringUtils.isNotEmpty(info.getKeyValue()))) {
+ ListSortStatusRequest request = new ListSortStatusRequest();
+ request.setInlongGroupIds(Collections.singletonList(groupInfo.getInlongGroupId()));
+ request.setCredentials(credentials);
+ ListSortStatusResponse sortStatusInfo = groupClient.listSortStatus(request);
+ if (MapUtils.isNotEmpty(sortStatusInfo.getStatusMap())) {
+ groupContext.updateSortStatus(sortStatusInfo.getStatusMap().getOrDefault(
+ groupInfo.getInlongGroupId(), SortStatus.UNKNOWN));
+ }
+ }
+ return groupContext;
+ }
+
private List<InlongStream> fetchInlongStreams(String groupId) {
List<InlongStreamInfo> streamInfos = streamClient.listStreamInfo(groupId);
if (CollectionUtils.isEmpty(streamInfos)) {
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java
index 64daa8d38..6aba39848 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java
@@ -21,6 +21,7 @@ import lombok.SneakyThrows;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.inlong.manager.client.api.ClientConfiguration;
import org.apache.inlong.manager.client.api.service.InlongGroupApi;
+import org.apache.inlong.manager.client.api.service.InlongSortApi;
import org.apache.inlong.manager.client.api.util.ClientUtils;
import org.apache.inlong.manager.common.enums.SimpleGroupStatus;
import org.apache.inlong.manager.common.util.JsonUtils;
@@ -34,6 +35,8 @@ import org.apache.inlong.manager.pojo.group.InlongGroupPageRequest;
import org.apache.inlong.manager.pojo.group.InlongGroupRequest;
import org.apache.inlong.manager.pojo.group.InlongGroupResetRequest;
import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
+import org.apache.inlong.manager.pojo.sort.ListSortStatusRequest;
+import org.apache.inlong.manager.pojo.sort.ListSortStatusResponse;
import org.apache.inlong.manager.pojo.workflow.WorkflowResult;
import org.springframework.boot.configurationprocessor.json.JSONObject;
import retrofit2.Call;
@@ -47,9 +50,11 @@ import static org.apache.inlong.manager.client.api.impl.InlongGroupImpl.MQ_FIELD
public class InlongGroupClient {
private final InlongGroupApi inlongGroupApi;
+ private final InlongSortApi inlongSortApi;
public InlongGroupClient(ClientConfiguration configuration) {
inlongGroupApi = ClientUtils.createRetrofit(configuration).create(InlongGroupApi.class);
+ inlongSortApi = ClientUtils.createRetrofit(configuration).create(InlongSortApi.class);
}
/**
@@ -138,6 +143,19 @@ public class InlongGroupClient {
return response.getData();
}
+ /**
+ * List sort task status for inlong groups
+ *
+ * @param request sort status request
+ * @return Response encapsulate of group id to sort status map
+ */
+ public ListSortStatusResponse listSortStatus(ListSortStatusRequest request) {
+ Response<ListSortStatusResponse> response = ClientUtils.executeHttpCall(
+ inlongSortApi.listSortStatus(request));
+ ClientUtils.assertRespSuccess(response);
+ return response.getData();
+ }
+
/**
* Create an inlong group
*/
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongSortApi.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongSortApi.java
new file mode 100644
index 000000000..fc0ce068f
--- /dev/null
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongSortApi.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api.service;
+
+import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.pojo.sort.ListSortStatusRequest;
+import org.apache.inlong.manager.pojo.sort.ListSortStatusResponse;
+import retrofit2.Call;
+import retrofit2.http.Body;
+import retrofit2.http.POST;
+
+public interface InlongSortApi {
+
+ @POST("sort/listStatus")
+ Call<Response<ListSortStatusResponse>> listSortStatus(@Body ListSortStatusRequest request);
+
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SortStatus.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SortStatus.java
new file mode 100644
index 000000000..c33f2e8e7
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SortStatus.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.enums;
+
+import com.fasterxml.jackson.annotation.JsonValue;
+
+/**
+ * Sort task status enum
+ */
+public enum SortStatus {
+
+ NOT_EXISTS(40, "job not exists"),
+ NEW(100, "job not started: draft, pending to run, etc"),
+ RUNNING(110, "job is running"),
+ PAUSED(120, "job is paused"),
+ STOPPED(130, "job has stopped without error, e.g canceled"),
+ FAILED(140, "job failed with error"),
+ FINISHED(200, "job finished successfully"),
+ OPERATING(300, "job in intermediate state such as restarting, canceling, etc"),
+ UNKNOWN(400, "job status unknown")
+ ;
+
+ @JsonValue
+ private final Integer code;
+ private final String description;
+
+ SortStatus(Integer code, String description) {
+ this.code = code;
+ this.description = description;
+ }
+
+ public Integer getCode() {
+ return code;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+}
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/FlinkSortProcessPlugin.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/FlinkSortProcessPlugin.java
index 896b770d4..be269c1ec 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/FlinkSortProcessPlugin.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/FlinkSortProcessPlugin.java
@@ -26,6 +26,8 @@ import org.apache.inlong.manager.plugin.listener.StartupSortListener;
import org.apache.inlong.manager.plugin.listener.StartupStreamListener;
import org.apache.inlong.manager.plugin.listener.SuspendSortListener;
import org.apache.inlong.manager.plugin.listener.SuspendStreamListener;
+import org.apache.inlong.manager.plugin.poller.FlinkStatusPoller;
+import org.apache.inlong.manager.workflow.plugin.SortStatusPoller;
import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
import org.apache.inlong.manager.workflow.event.task.SourceOperateListener;
import org.apache.inlong.manager.workflow.plugin.ProcessPlugin;
@@ -57,4 +59,9 @@ public class FlinkSortProcessPlugin implements ProcessPlugin {
listeners.add(new StartupStreamListener());
return listeners;
}
+
+ @Override
+ public SortStatusPoller createSortStatusPoller() {
+ return new FlinkStatusPoller();
+ }
}
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/poller/FlinkStatusPoller.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/poller/FlinkStatusPoller.java
new file mode 100644
index 000000000..90efcde21
--- /dev/null
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/poller/FlinkStatusPoller.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.plugin.poller;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.enums.SortStatus;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.plugin.flink.FlinkService;
+import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
+import org.apache.inlong.manager.workflow.plugin.SortStatusPoller;
+import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Flink sort task status poller for inlong groups
+ */
+@Slf4j
+public class FlinkStatusPoller implements SortStatusPoller {
+
+ /**
+ * Poll sort task status for groups
+ * @param groupInfos group ids to poll
+ * @param credentials not used for flink
+ * @return
+ */
+ @Override
+ public Map<String, SortStatus> poll(List<InlongGroupInfo> groupInfos, String credentials) {
+ Map<String, SortStatus> statusMap = new HashMap<>();
+ for (InlongGroupInfo groupInfo : groupInfos) {
+ String groupId = groupInfo.getInlongGroupId();
+ try {
+ List<InlongGroupExtInfo> extList = groupInfo.getExtList();
+ log.debug("inlong group {} ext info: {}", groupId, extList);
+
+ Map<String, String> kvConf = new HashMap<>();
+ extList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue()));
+ String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
+ if (StringUtils.isNotEmpty(sortExt)) {
+ Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
+ JsonUtils.OBJECT_MAPPER.readTree(sortExt), new TypeReference<Map<String, String>>() {
+ });
+ kvConf.putAll(result);
+ }
+
+ String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
+ if (StringUtils.isBlank(jobId)) {
+ statusMap.put(groupId, SortStatus.NOT_EXISTS);
+ continue;
+ }
+
+ String sortUrl = kvConf.get(InlongConstants.SORT_URL);
+ FlinkService flinkService = new FlinkService(sortUrl);
+ SortStatus status = convertToSortStatus(flinkService.getJobStatus(jobId));
+ statusMap.put(groupId, status);
+ log.info("get sort status = {} for flink jodId = {}, sortUrl = {}", status, jobId, sortUrl);
+ } catch (Exception e) {
+ log.error("polling sort status failed for group " + groupId, e);
+ statusMap.put(groupId, SortStatus.UNKNOWN);
+ }
+ }
+ return statusMap;
+ }
+
+ private SortStatus convertToSortStatus(JobStatus jobStatus) {
+ log.debug("flink jobStatus = {}", jobStatus);
+ switch (jobStatus) {
+ case CREATED:
+ case INITIALIZING:
+ return SortStatus.NEW;
+ case RUNNING:
+ return SortStatus.RUNNING;
+ case FAILED:
+ return SortStatus.FAILED;
+ case CANCELED:
+ return SortStatus.STOPPED;
+ case SUSPENDED:
+ return SortStatus.PAUSED;
+ case FINISHED:
+ return SortStatus.FINISHED;
+ case FAILING:
+ case CANCELLING:
+ case RESTARTING:
+ case RECONCILING:
+ return SortStatus.OPERATING;
+ }
+ return SortStatus.UNKNOWN;
+ }
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupStatusInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupStatusInfo.java
index c0cf4e678..7f798d478 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupStatusInfo.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupStatusInfo.java
@@ -24,6 +24,7 @@ import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.inlong.manager.common.enums.SimpleGroupStatus;
+import org.apache.inlong.manager.common.enums.SortStatus;
import org.apache.inlong.manager.pojo.source.StreamSource;
import java.util.List;
@@ -50,4 +51,7 @@ public class InlongGroupStatusInfo {
@ApiModelProperty(value = "Stream sources in the inlong group")
private List<StreamSource> streamSources;
+ @ApiModelProperty(value = "sort job status of the group")
+ private SortStatus sortStatus = SortStatus.UNKNOWN;
+
}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupStatusInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/ListSortStatusRequest.java
similarity index 53%
copy from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupStatusInfo.java
copy to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/ListSortStatusRequest.java
index c0cf4e678..98757e224 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupStatusInfo.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/ListSortStatusRequest.java
@@ -15,39 +15,25 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.pojo.group;
+package org.apache.inlong.manager.pojo.sort;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
-import lombok.AllArgsConstructor;
-import lombok.Builder;
import lombok.Data;
-import lombok.NoArgsConstructor;
-import org.apache.inlong.manager.common.enums.SimpleGroupStatus;
-import org.apache.inlong.manager.pojo.source.StreamSource;
import java.util.List;
/**
- * Inlong group status info
+ * Sort status request
*/
@Data
-@Builder
-@NoArgsConstructor
-@AllArgsConstructor
-@ApiModel("Inlong group status info")
-public class InlongGroupStatusInfo {
+@ApiModel("list sort status request")
+public class ListSortStatusRequest {
- @ApiModelProperty(value = "Inlong group id")
- private String inlongGroupId;
+ @ApiModelProperty(value = "Inlong group ids")
+ private List<String> inlongGroupIds;
- @ApiModelProperty(value = "Inlong group original status")
- private Integer originalStatus;
-
- @ApiModelProperty(value = "Inlong group simple status")
- private SimpleGroupStatus simpleGroupStatus;
-
- @ApiModelProperty(value = "Stream sources in the inlong group")
- private List<StreamSource> streamSources;
+ @ApiModelProperty(value = "Optional credential info needed for backend query, such as sort cluster token")
+ private String credentials;
}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupStatusInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/ListSortStatusResponse.java
similarity index 60%
copy from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupStatusInfo.java
copy to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/ListSortStatusResponse.java
index c0cf4e678..6b32b314a 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupStatusInfo.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/ListSortStatusResponse.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.pojo.group;
+package org.apache.inlong.manager.pojo.sort;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
@@ -23,31 +23,21 @@ import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
-import org.apache.inlong.manager.common.enums.SimpleGroupStatus;
-import org.apache.inlong.manager.pojo.source.StreamSource;
+import org.apache.inlong.manager.common.enums.SortStatus;
-import java.util.List;
+import java.util.Map;
/**
- * Inlong group status info
+ * Sort status response
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
-@ApiModel("Inlong group status info")
-public class InlongGroupStatusInfo {
+@ApiModel("list sort status response")
+public class ListSortStatusResponse {
- @ApiModelProperty(value = "Inlong group id")
- private String inlongGroupId;
-
- @ApiModelProperty(value = "Inlong group original status")
- private Integer originalStatus;
-
- @ApiModelProperty(value = "Inlong group simple status")
- private SimpleGroupStatus simpleGroupStatus;
-
- @ApiModelProperty(value = "Stream sources in the inlong group")
- private List<StreamSource> streamSources;
+ @ApiModelProperty(value = "group id to sort status mapping")
+ private Map<String, SortStatus> statusMap;
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortService.java
index 92d3ff3fe..7f24cc575 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortService.java
@@ -19,6 +19,8 @@ package org.apache.inlong.manager.service.core;
import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
import org.apache.inlong.common.pojo.sdk.SortSourceConfigResponse;
+import org.apache.inlong.manager.pojo.sort.ListSortStatusRequest;
+import org.apache.inlong.manager.pojo.sort.ListSortStatusResponse;
/**
* Sort Service
@@ -55,4 +57,12 @@ public interface SortService {
* @return Response of sort cluster config
*/
SortSourceConfigResponse getSourceConfig(String clusterName, String sortTaskId, String md5);
+
+ /**
+ * Get sort job status.
+ *
+ * @param request Request params like group ids and sort cluster token
+ * @return Response of corresponding sort jobs' status
+ */
+ ListSortStatusResponse listSortStatus(ListSortStatusRequest request);
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
index 0cf561fb9..b5aa0cb27 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
@@ -17,21 +17,37 @@
package org.apache.inlong.manager.service.core.impl;
+import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
import org.apache.inlong.common.pojo.sdk.SortSourceConfigResponse;
+import org.apache.inlong.manager.common.enums.SortStatus;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.pojo.sort.ListSortStatusRequest;
+import org.apache.inlong.manager.pojo.sort.ListSortStatusResponse;
import org.apache.inlong.manager.service.core.SortClusterService;
import org.apache.inlong.manager.service.core.SortSourceService;
import org.apache.inlong.manager.service.core.SortService;
+import org.apache.inlong.manager.service.group.InlongGroupService;
+import org.apache.inlong.manager.workflow.plugin.Plugin;
+import org.apache.inlong.manager.workflow.plugin.PluginBinder;
+import org.apache.inlong.manager.workflow.plugin.ProcessPlugin;
+import org.apache.inlong.manager.workflow.plugin.SortStatusPoller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
/**
* Sort service implementation.
*/
@Lazy
+@Slf4j
@Service
-public class SortServiceImpl implements SortService {
+public class SortServiceImpl implements SortService, PluginBinder {
@Lazy
@Autowired
@@ -41,9 +57,13 @@ public class SortServiceImpl implements SortService {
@Autowired
private SortClusterService sortClusterService;
+ @Autowired
+ private InlongGroupService groupService;
+
+ private SortStatusPoller sortStatusPoller;
+
@Override
public SortClusterResponse getClusterConfig(String clusterName, String md5) {
-
return sortClusterService.getClusterConfig(clusterName, md5);
}
@@ -51,4 +71,23 @@ public class SortServiceImpl implements SortService {
public SortSourceConfigResponse getSourceConfig(String clusterName, String sortTaskId, String md5) {
return sortSourceService.getSourceConfig(clusterName, sortTaskId, md5);
}
+
+ @Override
+ public ListSortStatusResponse listSortStatus(ListSortStatusRequest request) {
+ Preconditions.checkNotNull(sortStatusPoller, "sort job status poller not initialized");
+ List<InlongGroupInfo> groupInfoList = request.getInlongGroupIds().stream()
+ .map(groupId -> groupService.get(groupId)).collect(Collectors.toList());
+ Map<String, SortStatus> statusMap = sortStatusPoller.poll(groupInfoList, request.getCredentials());
+ log.debug("get sort status map = {}", statusMap);
+ return ListSortStatusResponse.builder().statusMap(statusMap).build();
+ }
+
+ @Override
+ public void acceptPlugin(Plugin plugin) {
+ if (!(plugin instanceof ProcessPlugin)) {
+ return;
+ }
+ ProcessPlugin processPlugin = (ProcessPlugin) plugin;
+ sortStatusPoller = processPlugin.createSortStatusPoller();
+ }
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
index 0d4a3f0b3..f132cd97e 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
@@ -116,7 +116,7 @@ public abstract class AbstractSourceOperator implements StreamSourceOperator {
@Override
@Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ)
- public void updateOpt(SourceRequest request, Integer groupStatus, String operator) {
+ public void updateOpt(SourceRequest request, Integer groupStatus, Integer groupMode, String operator) {
StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(request.getId());
Preconditions.checkNotNull(entity, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
@@ -124,7 +124,10 @@ public abstract class AbstractSourceOperator implements StreamSourceOperator {
LOGGER.warn("auto push source {} can not be updated", entity.getSourceName());
return;
}
- if (!SourceStatus.ALLOWED_UPDATE.contains(entity.getStatus())) {
+
+ boolean allowUpdate = InlongConstants.LIGHTWEIGHT_MODE.equals(groupMode)
+ || SourceStatus.ALLOWED_UPDATE.contains(entity.getStatus());
+ if (!allowUpdate) {
throw new BusinessException(String.format("source=%s is not allowed to update, "
+ "please wait until its changed to final status or stop / frozen / delete it firstly", entity));
}
@@ -157,21 +160,24 @@ public abstract class AbstractSourceOperator implements StreamSourceOperator {
setTargetEntity(request, entity);
entity.setModifier(operator);
- // re-issue task if necessary
entity.setPreviousStatus(entity.getStatus());
- if (GroupStatus.forCode(groupStatus).equals(GroupStatus.CONFIG_SUCCESSFUL)) {
- entity.setStatus(SourceStatus.TO_BE_ISSUED_ADD.getCode());
- } else {
- switch (SourceStatus.forCode(entity.getStatus())) {
- case SOURCE_NORMAL:
- entity.setStatus(SourceStatus.TO_BE_ISSUED_ADD.getCode());
- break;
- case SOURCE_FAILED:
- entity.setStatus(SourceStatus.SOURCE_NEW.getCode());
- break;
- default:
- // others leave it be
- break;
+
+ // re-issue task if necessary
+ if (InlongConstants.STANDARD_MODE.equals(groupMode)) {
+ if (GroupStatus.forCode(groupStatus).equals(GroupStatus.CONFIG_SUCCESSFUL)) {
+ entity.setStatus(SourceStatus.TO_BE_ISSUED_ADD.getCode());
+ } else {
+ switch (SourceStatus.forCode(entity.getStatus())) {
+ case SOURCE_NORMAL:
+ entity.setStatus(SourceStatus.TO_BE_ISSUED_ADD.getCode());
+ break;
+ case SOURCE_FAILED:
+ entity.setStatus(SourceStatus.SOURCE_NEW.getCode());
+ break;
+ default:
+ // others leave it be
+ break;
+ }
}
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java
index 181102561..573e30e6d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java
@@ -96,7 +96,7 @@ public interface StreamSourceOperator {
* @param groupStatus the belongs group status
* @param operator name of operator
*/
- void updateOpt(SourceRequest request, Integer groupStatus, String operator);
+ void updateOpt(SourceRequest request, Integer groupStatus, Integer groupMode, String operator);
/**
* Stop the source task.
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index 4bbc65b89..2f2214ce1 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -213,7 +213,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
if (CollectionUtils.isNotEmpty(streamFields)) {
streamFields.forEach(streamField -> streamField.setId(null));
}
- sourceOperator.updateOpt(request, groupEntity.getStatus(), operator);
+ sourceOperator.updateOpt(request, groupEntity.getStatus(), groupEntity.getLightweight(), operator);
LOGGER.info("success to update source info: {}", request);
return true;
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongSortController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongSortController.java
new file mode 100644
index 000000000..7b0a39666
--- /dev/null
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongSortController.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.web.controller;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.pojo.sort.ListSortStatusRequest;
+import org.apache.inlong.manager.pojo.sort.ListSortStatusResponse;
+import org.apache.inlong.manager.service.core.SortService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RestController;
+
+/**
+ * Inlong sort controller.
+ */
+@RestController
+@RequestMapping("/api")
+@Api(tags = "Inlong-Sort-API")
+public class InlongSortController {
+
+ @Autowired
+ private SortService sortService;
+
+ @RequestMapping(value = "/sort/listStatus", method = RequestMethod.POST)
+ @ApiOperation(value = "List sort job status by inlong groups")
+ public Response<ListSortStatusResponse> listSortStatus(@RequestBody ListSortStatusRequest request) {
+ return Response.success(sortService.listSortStatus(request));
+ }
+
+}
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/ProcessPlugin.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/ProcessPlugin.java
index e771db27f..efbae225b 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/ProcessPlugin.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/ProcessPlugin.java
@@ -45,4 +45,8 @@ public interface ProcessPlugin extends Plugin {
return null;
}
+ default SortStatusPoller createSortStatusPoller() {
+ return null;
+ }
+
}
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/ProcessPlugin.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/SortStatusPoller.java
similarity index 53%
copy from inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/ProcessPlugin.java
copy to inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/SortStatusPoller.java
index e771db27f..295fbcff6 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/ProcessPlugin.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/SortStatusPoller.java
@@ -17,32 +17,15 @@
package org.apache.inlong.manager.workflow.plugin;
-import org.apache.inlong.manager.workflow.event.task.QueueOperateListener;
-import org.apache.inlong.manager.workflow.event.task.SinkOperateListener;
-import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
-import org.apache.inlong.manager.workflow.event.task.SourceOperateListener;
+import org.apache.inlong.manager.common.enums.SortStatus;
+import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import java.util.List;
+import java.util.Map;
/**
- * Interface of process plugin.
+ * Sort task status poller interface for inlong groups
*/
-public interface ProcessPlugin extends Plugin {
-
- default List<SourceOperateListener> createSourceOperateListeners() {
- return null;
- }
-
- default List<SinkOperateListener> createSinkOperateListeners() {
- return null;
- }
-
- default List<QueueOperateListener> createQueueOperateListeners() {
- return null;
- }
-
- default List<SortOperateListener> createSortOperateListeners() {
- return null;
- }
-
+public interface SortStatusPoller {
+ Map<String, SortStatus> poll(List<InlongGroupInfo> groupInfo, String credentials);
}