You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/10/27 08:10:34 UTC
[inlong] branch master updated: [INLONG-6297][Manager] Refactor the PollerPlugin and SortPoller interfaces (#6299)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 6b96a5b41 [INLONG-6297][Manager] Refactor the PollerPlugin and SortPoller interfaces (#6299)
6b96a5b41 is described below
commit 6b96a5b4179815e295501738ba66757885dcf9a6
Author: healchow <he...@gmail.com>
AuthorDate: Thu Oct 27 16:10:28 2022 +0800
[INLONG-6297][Manager] Refactor the PollerPlugin and SortPoller interfaces (#6299)
---
.../manager/client/api/impl/InlongClientImpl.java | 27 ++++---
.../manager/client/api/impl/InlongGroupImpl.java | 31 +++++---
.../client/api/inner/client/InlongGroupClient.java | 13 ++--
.../manager/client/api/service/InlongSortApi.java | 8 +-
.../inlong/manager/common/enums/SortStatus.java | 13 ++--
.../inlong/manager/common}/plugin/Plugin.java | 2 +-
.../manager/common}/plugin/PluginBinder.java | 2 +-
.../manager/common}/plugin/PluginDefinition.java | 30 +++++---
.../manager/plugin/FlinkSortPollerPlugin.java} | 23 +++---
.../manager/plugin/FlinkSortProcessPlugin.java | 8 +-
...linkStatusPoller.java => SortStatusPoller.java} | 83 +++++++++++----------
.../src/main/resources/META-INF/plugin.yaml | 7 +-
.../manager/pojo/group/InlongGroupStatusInfo.java | 2 +-
...SortStatusResponse.java => SortStatusInfo.java} | 15 ++--
...rtStatusRequest.java => SortStatusRequest.java} | 12 ++-
inlong-manager/manager-service/pom.xml | 10 ++-
.../inlong/manager/service/core/SortService.java | 18 +++--
.../manager/service/core/impl/SortServiceImpl.java | 57 ++++++++------
.../service/listener/GroupTaskListenerFactory.java | 4 +-
.../listener/StreamTaskListenerFactory.java | 4 +-
.../inlong/manager/service/plugin/JarHell.java | 10 +--
.../manager/service/plugin/PluginClassLoader.java | 67 +++++++++--------
.../manager/service/plugin/PluginService.java | 49 ++++++------
.../service/plugin/PluginClassLoaderTest.java | 36 +++++----
.../manager/service/plugin/PluginServiceTest.java | 11 +--
.../manager/service/sort/DisableZkForSortTest.java | 7 +-
.../src/test/resources/application.properties | 1 -
.../resources/plugins/manager-plugin-example.jar | Bin 93525 -> 96655 bytes
.../web/controller/InlongSortController.java | 14 ++--
.../manager/workflow/plugin/ProcessPlugin.java | 5 +-
.../PollerPlugin.java} | 23 +++---
.../SortPoller.java} | 21 ++++--
32 files changed, 347 insertions(+), 266 deletions(-)
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
index 1a6c04c20..2852e0a16 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
@@ -51,7 +51,8 @@ import org.apache.inlong.manager.pojo.group.InlongGroupBriefInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupPageRequest;
import org.apache.inlong.manager.pojo.group.InlongGroupStatusInfo;
-import org.apache.inlong.manager.pojo.sort.ListSortStatusRequest;
+import org.apache.inlong.manager.pojo.sort.SortStatusInfo;
+import org.apache.inlong.manager.pojo.sort.SortStatusRequest;
import org.apache.inlong.manager.pojo.source.StreamSource;
import java.util.List;
@@ -152,20 +153,22 @@ public class InlongClientImpl implements InlongClient {
@Override
public Map<String, InlongGroupStatusInfo> listGroupStatus(List<String> groupIds, String credentials) {
- Map<String, InlongGroupStatusInfo> statusInfoMap = listGroupStatus(groupIds);
+ Map<String, InlongGroupStatusInfo> groupStatusMap = listGroupStatus(groupIds);
// sort status info
- ListSortStatusRequest sortStatusRequest = new ListSortStatusRequest();
- sortStatusRequest.setInlongGroupIds(groupIds);
- sortStatusRequest.setCredentials(credentials);
- Map<String, SortStatus> sortStatusMap = groupClient.listSortStatus(sortStatusRequest).getStatusMap();
-
- if (MapUtils.isNotEmpty(sortStatusMap)) {
- statusInfoMap.forEach((groupId, statusInfo) -> {
- statusInfo.setSortStatus(sortStatusMap.getOrDefault(groupId, SortStatus.UNKNOWN));
- });
+ SortStatusRequest statusRequest = new SortStatusRequest();
+ statusRequest.setInlongGroupIds(groupIds);
+ statusRequest.setCredentials(credentials);
+ List<SortStatusInfo> sortStatusInfos = groupClient.listSortStatus(statusRequest);
+
+ if (CollectionUtils.isNotEmpty(sortStatusInfos)) {
+ Map<String, SortStatus> sortStatusMap = sortStatusInfos.stream()
+ .collect(Collectors.toMap(SortStatusInfo::getInlongGroupId, SortStatusInfo::getSortStatus));
+ groupStatusMap.forEach((groupId, groupStatusInfo) ->
+ groupStatusInfo.setSortStatus(sortStatusMap.getOrDefault(groupId, SortStatus.NOT_EXISTS)));
}
- return statusInfoMap;
+
+ return groupStatusMap;
}
@Override
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
index f7c4fb656..2f6538e22 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
@@ -19,7 +19,6 @@ package org.apache.inlong.manager.client.api.impl;
import com.google.common.base.Objects;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.inlong.manager.client.api.ClientConfiguration;
@@ -46,8 +45,8 @@ import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupResetRequest;
import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
import org.apache.inlong.manager.pojo.sort.BaseSortConf;
-import org.apache.inlong.manager.pojo.sort.ListSortStatusRequest;
-import org.apache.inlong.manager.pojo.sort.ListSortStatusResponse;
+import org.apache.inlong.manager.pojo.sort.SortStatusInfo;
+import org.apache.inlong.manager.pojo.sort.SortStatusRequest;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.workflow.ProcessResponse;
import org.apache.inlong.manager.pojo.workflow.TaskResponse;
@@ -57,6 +56,7 @@ import org.springframework.boot.configurationprocessor.json.JSONObject;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import java.util.stream.Collectors;
/**
@@ -294,17 +294,26 @@ public class InlongGroupImpl implements InlongGroup {
private InlongGroupContext generateSnapshot(String credentials) {
InlongGroupContext groupContext = generateSnapshot();
InlongGroupInfo groupInfo = groupContext.getGroupInfo();
- if (groupInfo.getExtList().stream().anyMatch(info -> InlongConstants.SORT_JOB_ID.equals(info.getKeyName())
- && StringUtils.isNotEmpty(info.getKeyValue()))) {
- ListSortStatusRequest request = new ListSortStatusRequest();
- request.setInlongGroupIds(Collections.singletonList(groupInfo.getInlongGroupId()));
+
+ SortStatus sortStatus = SortStatus.NOT_EXISTS;
+ if (groupInfo.getExtList().stream().anyMatch(ext -> InlongConstants.SORT_JOB_ID.equals(ext.getKeyName())
+ && StringUtils.isNotEmpty(ext.getKeyValue()))) {
+ SortStatusRequest request = new SortStatusRequest();
+ final String groupId = groupInfo.getInlongGroupId();
+ request.setInlongGroupIds(Collections.singletonList(groupId));
request.setCredentials(credentials);
- ListSortStatusResponse sortStatusInfo = groupClient.listSortStatus(request);
- if (MapUtils.isNotEmpty(sortStatusInfo.getStatusMap())) {
- groupContext.updateSortStatus(sortStatusInfo.getStatusMap().getOrDefault(
- groupInfo.getInlongGroupId(), SortStatus.UNKNOWN));
+ List<SortStatusInfo> statusInfos = groupClient.listSortStatus(request);
+ if (CollectionUtils.isNotEmpty(statusInfos)) {
+ Optional<SortStatusInfo> optional = statusInfos.stream()
+ .filter(statusInfo -> groupId.equals(statusInfo.getInlongGroupId()))
+ .findFirst();
+ if (optional.isPresent()) {
+ sortStatus = optional.get().getSortStatus();
+ }
}
}
+ groupContext.updateSortStatus(sortStatus);
+
return groupContext;
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java
index 6aba39848..e1f58763d 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java
@@ -35,12 +35,14 @@ import org.apache.inlong.manager.pojo.group.InlongGroupPageRequest;
import org.apache.inlong.manager.pojo.group.InlongGroupRequest;
import org.apache.inlong.manager.pojo.group.InlongGroupResetRequest;
import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
-import org.apache.inlong.manager.pojo.sort.ListSortStatusRequest;
-import org.apache.inlong.manager.pojo.sort.ListSortStatusResponse;
+import org.apache.inlong.manager.pojo.sort.SortStatusInfo;
+import org.apache.inlong.manager.pojo.sort.SortStatusRequest;
import org.apache.inlong.manager.pojo.workflow.WorkflowResult;
import org.springframework.boot.configurationprocessor.json.JSONObject;
import retrofit2.Call;
+import java.util.List;
+
import static org.apache.inlong.manager.client.api.impl.InlongGroupImpl.MQ_FIELD;
import static org.apache.inlong.manager.client.api.impl.InlongGroupImpl.MQ_FIELD_OLD;
@@ -147,11 +149,10 @@ public class InlongGroupClient {
* List sort task status for inlong groups
*
* @param request sort status request
- * @return Response encapsulate of group id to sort status map
+ * @return list of sort status infos
*/
- public ListSortStatusResponse listSortStatus(ListSortStatusRequest request) {
- Response<ListSortStatusResponse> response = ClientUtils.executeHttpCall(
- inlongSortApi.listSortStatus(request));
+ public List<SortStatusInfo> listSortStatus(SortStatusRequest request) {
+ Response<List<SortStatusInfo>> response = ClientUtils.executeHttpCall(inlongSortApi.listStatus(request));
ClientUtils.assertRespSuccess(response);
return response.getData();
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongSortApi.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongSortApi.java
index fc0ce068f..69881ed70 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongSortApi.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongSortApi.java
@@ -18,15 +18,17 @@
package org.apache.inlong.manager.client.api.service;
import org.apache.inlong.manager.pojo.common.Response;
-import org.apache.inlong.manager.pojo.sort.ListSortStatusRequest;
-import org.apache.inlong.manager.pojo.sort.ListSortStatusResponse;
+import org.apache.inlong.manager.pojo.sort.SortStatusInfo;
+import org.apache.inlong.manager.pojo.sort.SortStatusRequest;
import retrofit2.Call;
import retrofit2.http.Body;
import retrofit2.http.POST;
+import java.util.List;
+
public interface InlongSortApi {
@POST("sort/listStatus")
- Call<Response<ListSortStatusResponse>> listSortStatus(@Body ListSortStatusRequest request);
+ Call<Response<List<SortStatusInfo>>> listStatus(@Body SortStatusRequest request);
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SortStatus.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SortStatus.java
index c33f2e8e7..183b2d4a9 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SortStatus.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SortStatus.java
@@ -25,14 +25,15 @@ import com.fasterxml.jackson.annotation.JsonValue;
public enum SortStatus {
NOT_EXISTS(40, "job not exists"),
- NEW(100, "job not started: draft, pending to run, etc"),
+ NEW(100, "job not started: draft, pending, etc."),
RUNNING(110, "job is running"),
- PAUSED(120, "job is paused"),
- STOPPED(130, "job has stopped without error, e.g canceled"),
- FAILED(140, "job failed with error"),
+ PAUSED(120, "job was paused"),
+ STOPPED(130, "job stopped without error, e.g canceled"),
+ FAILED(140, "job failed with an error"),
FINISHED(200, "job finished successfully"),
- OPERATING(300, "job in intermediate state such as restarting, canceling, etc"),
- UNKNOWN(400, "job status unknown")
+ OPERATING(300, "job in an intermediate status such as restarting, canceling, etc."),
+ UNKNOWN(400, "job status unknown"),
+
;
@JsonValue
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/Plugin.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/plugin/Plugin.java
similarity index 94%
rename from inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/Plugin.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/plugin/Plugin.java
index 27f200673..a8f0a4b75 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/Plugin.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/plugin/Plugin.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.workflow.plugin;
+package org.apache.inlong.manager.common.plugin;
/**
* Interface of plugin.
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/PluginBinder.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/plugin/PluginBinder.java
similarity index 94%
rename from inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/PluginBinder.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/plugin/PluginBinder.java
index d90327e13..3086534dc 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/PluginBinder.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/plugin/PluginBinder.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.workflow.plugin;
+package org.apache.inlong.manager.common.plugin;
/**
* Interface of plugin binder.
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/PluginDefinition.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/plugin/PluginDefinition.java
similarity index 63%
rename from inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/PluginDefinition.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/plugin/PluginDefinition.java
index c0d5f1856..f91080579 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/PluginDefinition.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/plugin/PluginDefinition.java
@@ -15,38 +15,46 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.workflow.plugin;
+package org.apache.inlong.manager.common.plugin;
import lombok.Data;
+import java.util.List;
+
/**
- * pluginDefinition should be defined in *.jar/META-INF/plugin.yaml
- * for example:
- * name: test
- * description: this plugin is use for test
- * pluginClass: org.apache.inlong.plugin.testPlugin
+ * PluginDefinition should be defined in *.jar/META-INF/plugin.yaml.
+ * <p/> For example:
+ *
+ * <pre>
+ * name: test-plugin
+ * description: this plugin is used for test
* javaVersion: 1.8 or 8
+ * pluginClasses:
+ * - org.apache.inlong.plugin.TestPlugin1
+ * - org.apache.inlong.plugin.TestPlugin2
+ * </pre>
*/
@Data
public class PluginDefinition {
/**
- * name of plugin
+ * Name of plugin
*/
private String name;
/**
- * description of plugin to be used for user help
+ * Description of plugin to be used for user help
*/
private String description;
/**
- * java_version of plugin to be used for check validate
+ * Java version of plugin to be used for check validate
*/
private String javaVersion;
/**
- * the full class name of plugin
+ * List of the full class name of plugins
*/
- private String pluginClass;
+ private List<String> pluginClasses;
+
}
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/SortStatusPoller.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/FlinkSortPollerPlugin.java
similarity index 63%
copy from inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/SortStatusPoller.java
copy to inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/FlinkSortPollerPlugin.java
index 295fbcff6..6ab32b2fb 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/SortStatusPoller.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/FlinkSortPollerPlugin.java
@@ -15,17 +15,22 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.workflow.plugin;
+package org.apache.inlong.manager.plugin;
-import org.apache.inlong.manager.common.enums.SortStatus;
-import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
-
-import java.util.List;
-import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.plugin.poller.SortStatusPoller;
+import org.apache.inlong.manager.workflow.plugin.sort.PollerPlugin;
+import org.apache.inlong.manager.workflow.plugin.sort.SortPoller;
/**
- * Sort task status poller interface for inlong groups
+ * Plugin of Flink Sort poller.
*/
-public interface SortStatusPoller {
- Map<String, SortStatus> poll(List<InlongGroupInfo> groupInfo, String credentials);
+@Slf4j
+public class FlinkSortPollerPlugin implements PollerPlugin {
+
+ @Override
+ public SortPoller getSortPoller() {
+ return new SortStatusPoller();
+ }
+
}
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/FlinkSortProcessPlugin.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/FlinkSortProcessPlugin.java
index be269c1ec..e475414bf 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/FlinkSortProcessPlugin.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/FlinkSortProcessPlugin.java
@@ -26,8 +26,6 @@ import org.apache.inlong.manager.plugin.listener.StartupSortListener;
import org.apache.inlong.manager.plugin.listener.StartupStreamListener;
import org.apache.inlong.manager.plugin.listener.SuspendSortListener;
import org.apache.inlong.manager.plugin.listener.SuspendStreamListener;
-import org.apache.inlong.manager.plugin.poller.FlinkStatusPoller;
-import org.apache.inlong.manager.workflow.plugin.SortStatusPoller;
import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
import org.apache.inlong.manager.workflow.event.task.SourceOperateListener;
import org.apache.inlong.manager.workflow.plugin.ProcessPlugin;
@@ -36,7 +34,7 @@ import java.util.LinkedList;
import java.util.List;
/**
- * Plugin of flink sort process.
+ * Plugin of Flink Sort process.
*/
@Slf4j
public class FlinkSortProcessPlugin implements ProcessPlugin {
@@ -60,8 +58,4 @@ public class FlinkSortProcessPlugin implements ProcessPlugin {
return listeners;
}
- @Override
- public SortStatusPoller createSortStatusPoller() {
- return new FlinkStatusPoller();
- }
}
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/poller/FlinkStatusPoller.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/poller/SortStatusPoller.java
similarity index 55%
rename from inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/poller/FlinkStatusPoller.java
rename to inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/poller/SortStatusPoller.java
index 90efcde21..386979ecc 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/poller/FlinkStatusPoller.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/poller/SortStatusPoller.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.plugin.poller;
import com.fasterxml.jackson.core.type.TypeReference;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.JobStatus;
import org.apache.inlong.manager.common.consts.InlongConstants;
@@ -26,9 +27,12 @@ import org.apache.inlong.manager.common.enums.SortStatus;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.plugin.flink.FlinkService;
import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
-import org.apache.inlong.manager.workflow.plugin.SortStatusPoller;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.pojo.sort.SortStatusInfo;
+import org.apache.inlong.manager.workflow.plugin.sort.SortPoller;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -37,17 +41,38 @@ import java.util.Map;
* Flink sort task status poller for inlong groups
*/
@Slf4j
-public class FlinkStatusPoller implements SortStatusPoller {
+public class SortStatusPoller implements SortPoller {
/**
- * Poll sort task status for groups
- * @param groupInfos group ids to poll
- * @param credentials not used for flink
- * @return
+ * Flink job status to InLong sort status mapping.
*/
+ private static final Map<JobStatus, SortStatus> JOB_SORT_STATUS_MAP = new HashMap<>(16);
+
+ static {
+ JOB_SORT_STATUS_MAP.put(JobStatus.CREATED, SortStatus.NEW);
+ JOB_SORT_STATUS_MAP.put(JobStatus.INITIALIZING, SortStatus.NEW);
+
+ JOB_SORT_STATUS_MAP.put(JobStatus.RUNNING, SortStatus.RUNNING);
+ JOB_SORT_STATUS_MAP.put(JobStatus.FAILED, SortStatus.FAILED);
+ JOB_SORT_STATUS_MAP.put(JobStatus.CANCELED, SortStatus.STOPPED);
+ JOB_SORT_STATUS_MAP.put(JobStatus.SUSPENDED, SortStatus.PAUSED);
+ JOB_SORT_STATUS_MAP.put(JobStatus.FINISHED, SortStatus.FINISHED);
+
+ JOB_SORT_STATUS_MAP.put(JobStatus.FAILING, SortStatus.OPERATING);
+ JOB_SORT_STATUS_MAP.put(JobStatus.CANCELLING, SortStatus.OPERATING);
+ JOB_SORT_STATUS_MAP.put(JobStatus.RESTARTING, SortStatus.OPERATING);
+ JOB_SORT_STATUS_MAP.put(JobStatus.RECONCILING, SortStatus.OPERATING);
+ }
+
@Override
- public Map<String, SortStatus> poll(List<InlongGroupInfo> groupInfos, String credentials) {
- Map<String, SortStatus> statusMap = new HashMap<>();
+ public List<SortStatusInfo> pollSortStatus(List<InlongGroupInfo> groupInfos, String credentials) {
+ log.debug("begin to poll sort status for inlong groups");
+ if (CollectionUtils.isEmpty(groupInfos)) {
+ log.debug("end to poll sort status, as the inlong groups is empty");
+ return Collections.emptyList();
+ }
+
+ List<SortStatusInfo> statusInfos = new ArrayList<>(groupInfos.size());
for (InlongGroupInfo groupInfo : groupInfos) {
String groupId = groupInfo.getInlongGroupId();
try {
@@ -65,46 +90,26 @@ public class FlinkStatusPoller implements SortStatusPoller {
}
String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
+ SortStatusInfo statusInfo = SortStatusInfo.builder().inlongGroupId(groupId).build();
if (StringUtils.isBlank(jobId)) {
- statusMap.put(groupId, SortStatus.NOT_EXISTS);
+ statusInfo.setSortStatus(SortStatus.NOT_EXISTS);
+ statusInfos.add(statusInfo);
continue;
}
String sortUrl = kvConf.get(InlongConstants.SORT_URL);
FlinkService flinkService = new FlinkService(sortUrl);
- SortStatus status = convertToSortStatus(flinkService.getJobStatus(jobId));
- statusMap.put(groupId, status);
- log.info("get sort status = {} for flink jodId = {}, sortUrl = {}", status, jobId, sortUrl);
+ statusInfo.setSortStatus(
+ JOB_SORT_STATUS_MAP.getOrDefault(flinkService.getJobStatus(jobId), SortStatus.UNKNOWN)
+ );
+ statusInfos.add(statusInfo);
} catch (Exception e) {
- log.error("polling sort status failed for group " + groupId, e);
- statusMap.put(groupId, SortStatus.UNKNOWN);
+ log.error("polling sort status failed for groupId=" + groupId, e);
}
}
- return statusMap;
- }
- private SortStatus convertToSortStatus(JobStatus jobStatus) {
- log.debug("flink jobStatus = {}", jobStatus);
- switch (jobStatus) {
- case CREATED:
- case INITIALIZING:
- return SortStatus.NEW;
- case RUNNING:
- return SortStatus.RUNNING;
- case FAILED:
- return SortStatus.FAILED;
- case CANCELED:
- return SortStatus.STOPPED;
- case SUSPENDED:
- return SortStatus.PAUSED;
- case FINISHED:
- return SortStatus.FINISHED;
- case FAILING:
- case CANCELLING:
- case RESTARTING:
- case RECONCILING:
- return SortStatus.OPERATING;
- }
- return SortStatus.UNKNOWN;
+ log.debug("success to get sort status: {}", statusInfos);
+ return statusInfos;
}
+
}
diff --git a/inlong-manager/manager-plugins/src/main/resources/META-INF/plugin.yaml b/inlong-manager/manager-plugins/src/main/resources/META-INF/plugin.yaml
index 31a71af27..28a92c58f 100644
--- a/inlong-manager/manager-plugins/src/main/resources/META-INF/plugin.yaml
+++ b/inlong-manager/manager-plugins/src/main/resources/META-INF/plugin.yaml
@@ -15,9 +15,10 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-#
-name: flink plugin
+name: flink-sort-plugin
description: flink for manager plugin
javaVersion: 1.8
-pluginClass: org.apache.inlong.manager.plugin.FlinkSortProcessPlugin
+pluginClasses:
+ - org.apache.inlong.manager.plugin.FlinkSortPollerPlugin
+ - org.apache.inlong.manager.plugin.FlinkSortProcessPlugin
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupStatusInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupStatusInfo.java
index 7f798d478..9d62f0618 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupStatusInfo.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupStatusInfo.java
@@ -51,7 +51,7 @@ public class InlongGroupStatusInfo {
@ApiModelProperty(value = "Stream sources in the inlong group")
private List<StreamSource> streamSources;
- @ApiModelProperty(value = "sort job status of the group")
+ @ApiModelProperty(value = "Sort job status of the group")
private SortStatus sortStatus = SortStatus.UNKNOWN;
}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/ListSortStatusResponse.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/SortStatusInfo.java
similarity index 82%
rename from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/ListSortStatusResponse.java
rename to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/SortStatusInfo.java
index 6b32b314a..bb4011899 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/ListSortStatusResponse.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/SortStatusInfo.java
@@ -25,19 +25,20 @@ import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.inlong.manager.common.enums.SortStatus;
-import java.util.Map;
-
/**
- * Sort status response
+ * Sort status info
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
-@ApiModel("list sort status response")
-public class ListSortStatusResponse {
+@ApiModel("Sort status info")
+public class SortStatusInfo {
+
+ @ApiModelProperty(value = "Inlong group id")
+ private String inlongGroupId;
- @ApiModelProperty(value = "group id to sort status mapping")
- private Map<String, SortStatus> statusMap;
+ @ApiModelProperty(value = "Sort status info")
+ private SortStatus sortStatus;
}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/ListSortStatusRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/SortStatusRequest.java
similarity index 79%
rename from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/ListSortStatusRequest.java
rename to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/SortStatusRequest.java
index 98757e224..d29368fe0 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/ListSortStatusRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/SortStatusRequest.java
@@ -19,7 +19,10 @@ package org.apache.inlong.manager.pojo.sort;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
import lombok.Data;
+import lombok.NoArgsConstructor;
import java.util.List;
@@ -27,13 +30,16 @@ import java.util.List;
* Sort status request
*/
@Data
-@ApiModel("list sort status request")
-public class ListSortStatusRequest {
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@ApiModel("Sort status request")
+public class SortStatusRequest {
@ApiModelProperty(value = "Inlong group ids")
private List<String> inlongGroupIds;
- @ApiModelProperty(value = "Optional credential info needed for backend query, such as sort cluster token")
+ @ApiModelProperty(value = "Optional credential info, such as the token of Sort cluster")
private String credentials;
}
diff --git a/inlong-manager/manager-service/pom.xml b/inlong-manager/manager-service/pom.xml
index 0bb9ccaf0..68c5818e9 100644
--- a/inlong-manager/manager-service/pom.xml
+++ b/inlong-manager/manager-service/pom.xml
@@ -104,6 +104,11 @@
<artifactId>slf4j-api</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.github.ben-manes.caffeine</groupId>
+ <artifactId>caffeine</artifactId>
+ </dependency>
+
<dependency>
<groupId>org.apache.shiro</groupId>
<artifactId>shiro-core</artifactId>
@@ -550,8 +555,9 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>com.github.ben-manes.caffeine</groupId>
- <artifactId>caffeine</artifactId>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <scope>test</scope>
</dependency>
</dependencies>
</project>
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortService.java
index 7f24cc575..610bf1918 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortService.java
@@ -17,15 +17,18 @@
package org.apache.inlong.manager.service.core;
-import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
import org.apache.inlong.common.pojo.sdk.SortSourceConfigResponse;
-import org.apache.inlong.manager.pojo.sort.ListSortStatusRequest;
-import org.apache.inlong.manager.pojo.sort.ListSortStatusResponse;
+import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
+import org.apache.inlong.manager.pojo.sort.SortStatusInfo;
+import org.apache.inlong.manager.pojo.sort.SortStatusRequest;
+
+import java.util.List;
/**
* Sort Service
*/
public interface SortService {
+
/**
* Get sort cluster config.
*
@@ -59,10 +62,11 @@ public interface SortService {
SortSourceConfigResponse getSourceConfig(String clusterName, String sortTaskId, String md5);
/**
- * Get sort job status.
+ * List Sort job status.
*
- * @param request Request params like group ids and sort cluster token
- * @return Response of corresponding sort jobs' status
+ * @param request Sort status request, including inlong group ids,sort cluster token, etc.
+ * @return list of Sort job status
*/
- ListSortStatusResponse listSortStatus(ListSortStatusRequest request);
+ List<SortStatusInfo> listSortStatus(SortStatusRequest request);
+
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
index b5aa0cb27..a8fd0677c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
@@ -18,27 +18,26 @@
package org.apache.inlong.manager.service.core.impl;
import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
import org.apache.inlong.common.pojo.sdk.SortSourceConfigResponse;
-import org.apache.inlong.manager.common.enums.SortStatus;
+import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.plugin.Plugin;
+import org.apache.inlong.manager.common.plugin.PluginBinder;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.pojo.sort.ListSortStatusRequest;
-import org.apache.inlong.manager.pojo.sort.ListSortStatusResponse;
+import org.apache.inlong.manager.pojo.sort.SortStatusInfo;
+import org.apache.inlong.manager.pojo.sort.SortStatusRequest;
import org.apache.inlong.manager.service.core.SortClusterService;
-import org.apache.inlong.manager.service.core.SortSourceService;
import org.apache.inlong.manager.service.core.SortService;
+import org.apache.inlong.manager.service.core.SortSourceService;
import org.apache.inlong.manager.service.group.InlongGroupService;
-import org.apache.inlong.manager.workflow.plugin.Plugin;
-import org.apache.inlong.manager.workflow.plugin.PluginBinder;
-import org.apache.inlong.manager.workflow.plugin.ProcessPlugin;
-import org.apache.inlong.manager.workflow.plugin.SortStatusPoller;
+import org.apache.inlong.manager.workflow.plugin.sort.PollerPlugin;
+import org.apache.inlong.manager.workflow.plugin.sort.SortPoller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import java.util.List;
-import java.util.Map;
import java.util.stream.Collectors;
/**
@@ -52,15 +51,18 @@ public class SortServiceImpl implements SortService, PluginBinder {
@Lazy
@Autowired
private SortSourceService sortSourceService;
-
@Lazy
@Autowired
private SortClusterService sortClusterService;
-
@Autowired
private InlongGroupService groupService;
- private SortStatusPoller sortStatusPoller;
+ /**
+ * The plugin poller will be initialed after the application starts.
+ *
+ * @see org.apache.inlong.manager.service.plugin.PluginService#afterPropertiesSet
+ */
+ private SortPoller sortPoller;
@Override
public SortClusterResponse getClusterConfig(String clusterName, String md5) {
@@ -73,21 +75,28 @@ public class SortServiceImpl implements SortService, PluginBinder {
}
@Override
- public ListSortStatusResponse listSortStatus(ListSortStatusRequest request) {
- Preconditions.checkNotNull(sortStatusPoller, "sort job status poller not initialized");
- List<InlongGroupInfo> groupInfoList = request.getInlongGroupIds().stream()
- .map(groupId -> groupService.get(groupId)).collect(Collectors.toList());
- Map<String, SortStatus> statusMap = sortStatusPoller.poll(groupInfoList, request.getCredentials());
- log.debug("get sort status map = {}", statusMap);
- return ListSortStatusResponse.builder().statusMap(statusMap).build();
+ public List<SortStatusInfo> listSortStatus(SortStatusRequest request) {
+ Preconditions.checkNotNull(sortPoller, "sort status poller not initialized, please try later");
+
+ try {
+ List<InlongGroupInfo> groupInfoList = request.getInlongGroupIds().stream()
+ .map(groupId -> groupService.get(groupId))
+ .collect(Collectors.toList());
+
+ List<SortStatusInfo> statusInfos = sortPoller.pollSortStatus(groupInfoList, request.getCredentials());
+ log.debug("success to list sort status for request={}, result={}", request, statusInfos);
+ return statusInfos;
+ } catch (Exception e) {
+ log.error("poll sort status error: ", e);
+ throw new BusinessException("poll sort status error: " + e.getMessage());
+ }
}
@Override
public void acceptPlugin(Plugin plugin) {
- if (!(plugin instanceof ProcessPlugin)) {
- return;
+ if (plugin instanceof PollerPlugin) {
+ PollerPlugin pollerPlugin = (PollerPlugin) plugin;
+ sortPoller = pollerPlugin.getSortPoller();
}
- ProcessPlugin processPlugin = (ProcessPlugin) plugin;
- sortStatusPoller = processPlugin.createSortStatusPoller();
}
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/GroupTaskListenerFactory.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/GroupTaskListenerFactory.java
index 9e0328d94..aa5e57889 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/GroupTaskListenerFactory.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/GroupTaskListenerFactory.java
@@ -33,8 +33,8 @@ import org.apache.inlong.manager.workflow.event.task.QueueOperateListener;
import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
import org.apache.inlong.manager.workflow.event.task.SourceOperateListener;
import org.apache.inlong.manager.workflow.event.task.TaskEventListener;
-import org.apache.inlong.manager.workflow.plugin.Plugin;
-import org.apache.inlong.manager.workflow.plugin.PluginBinder;
+import org.apache.inlong.manager.common.plugin.Plugin;
+import org.apache.inlong.manager.common.plugin.PluginBinder;
import org.apache.inlong.manager.workflow.plugin.ProcessPlugin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/StreamTaskListenerFactory.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/StreamTaskListenerFactory.java
index d14cf9052..110fe109f 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/StreamTaskListenerFactory.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/StreamTaskListenerFactory.java
@@ -30,8 +30,8 @@ import org.apache.inlong.manager.workflow.event.task.SinkOperateListener;
import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
import org.apache.inlong.manager.workflow.event.task.SourceOperateListener;
import org.apache.inlong.manager.workflow.event.task.TaskEventListener;
-import org.apache.inlong.manager.workflow.plugin.Plugin;
-import org.apache.inlong.manager.workflow.plugin.PluginBinder;
+import org.apache.inlong.manager.common.plugin.Plugin;
+import org.apache.inlong.manager.common.plugin.PluginBinder;
import org.apache.inlong.manager.workflow.plugin.ProcessPlugin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/plugin/JarHell.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/plugin/JarHell.java
index 62c1b0bb6..62dad0869 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/plugin/JarHell.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/plugin/JarHell.java
@@ -43,8 +43,7 @@ public class JarHell {
JavaVersion version = parse(javaVersion);
if (CURRENT_VERSION.compareTo(version) < 0) {
throw new IllegalArgumentException(
- String.format("%s requires Java %s:, your system: %s", resource, CURRENT_VERSION,
- javaVersion));
+ String.format("%s requires Java %s, your system: %s", resource, CURRENT_VERSION, javaVersion));
}
}
@@ -76,7 +75,7 @@ public class JarHell {
}
/**
- * Check the string if is vaild.
+ * Check the string if is valid.
*/
public static boolean isValid(String value) {
return value.matches("^0*[0-9]+(\\.[0-9]+)*(-[a-zA-Z0-9]+)?$");
@@ -127,7 +126,7 @@ public class JarHell {
return -1;
} else if (prePart == null && o.prePart != null) {
return 1;
- } else if (prePart != null && o.prePart != null) {
+ } else if (prePart != null) {
return comparePrePart(prePart, o.prePart);
}
return 0;
@@ -138,8 +137,7 @@ public class JarHell {
return otherPrePart.matches("\\d+")
? (new BigInteger(prePart)).compareTo(new BigInteger(otherPrePart)) : -1;
} else {
- return otherPrePart.matches("\\d+")
- ? 1 : prePart.compareTo(otherPrePart);
+ return otherPrePart.matches("\\d+") ? 1 : prePart.compareTo(otherPrePart);
}
}
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/plugin/PluginClassLoader.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/plugin/PluginClassLoader.java
index 5eb5a28ec..e8088f18e 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/plugin/PluginClassLoader.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/plugin/PluginClassLoader.java
@@ -24,7 +24,8 @@ import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.workflow.plugin.PluginDefinition;
+import org.apache.inlong.manager.common.plugin.PluginDefinition;
+import org.apache.inlong.manager.common.util.Preconditions;
import java.io.File;
import java.io.IOException;
@@ -57,19 +58,27 @@ public class PluginClassLoader extends URLClassLoader {
* plugin.yaml should less than 1k
*/
public static final int PLUGIN_DEF_CAPACITY = 1024;
+ private static final ObjectMapper YAML_MAPPER;
+
+ static {
+ YAML_MAPPER = new ObjectMapper(new YAMLFactory());
+ YAML_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ YAML_MAPPER.configure(DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL, true);
+ YAML_MAPPER.setSerializationInclusion(Include.NON_NULL);
+ }
+
private final File pluginDirectory;
+ private final String osName;
+
/**
* pluginName -> pluginDefinition
*/
private Map<String, PluginDefinition> pluginDefinitionMap = new HashMap<>();
- private ObjectMapper yamlMapper;
- private String osName;
private PluginClassLoader(URL url, ClassLoader parent, String osName) throws IOException {
super(new URL[]{url}, parent);
this.pluginDirectory = new File(url.getPath());
this.osName = osName;
- initYamlMapper();
loadPluginDefinition();
}
@@ -120,14 +129,6 @@ public class PluginClassLoader extends URLClassLoader {
return this.pluginDefinitionMap;
}
- private void initYamlMapper() {
- ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
- mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
- mapper.configure(DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL, true);
- mapper.setSerializationInclusion(Include.NON_NULL);
- this.yamlMapper = mapper;
- }
-
/**
* load pluginDefinition in **.jar/META-INF/plugin.yaml
*/
@@ -141,46 +142,46 @@ public class PluginClassLoader extends URLClassLoader {
List<PluginDefinition> definitions = new ArrayList<>();
for (File jarFile : files) {
if (!jarFile.getName().endsWith(".jar")) {
- log.warn("{} is not valid plugin jar, skip to load", jarFile);
+ log.warn("invalid plugin jar {}, skip to load", jarFile);
continue;
}
- log.info("{} is valid plugin jar, start to load", jarFile);
+
+ log.info("start to load valid plugin jar {}", jarFile);
JarFile pluginJar = new JarFile(jarFile);
String pluginDef = readPluginDef(pluginJar);
pluginDef = pluginDef.replaceAll("[\\x00]+", "");
- PluginDefinition definition = yamlMapper.readValue(pluginDef, PluginDefinition.class);
+ PluginDefinition definition = YAML_MAPPER.readValue(pluginDef, PluginDefinition.class);
+
if (osName.startsWith(WINDOWS_PREFIX)) {
- addURL(new URL("file:///" + jarFile.getAbsolutePath()));
+ super.addURL(new URL("file:///" + jarFile.getAbsolutePath()));
} else {
- addURL(new URL("file://" + jarFile.getAbsolutePath()));
+ super.addURL(new URL("file://" + jarFile.getAbsolutePath()));
}
checkPluginValid(jarFile, definition);
definitions.add(definition);
}
+
pluginDefinitionMap = definitions.stream()
.collect(Collectors.toMap(PluginDefinition::getName, definition -> definition));
}
private void checkPluginValid(File jarFile, PluginDefinition pluginDefinition) {
- if (StringUtils.isEmpty(pluginDefinition.getName())) {
- throw new RuntimeException(String.format("%s should define pluginName in plugin.yaml", jarFile.getName()));
- }
- if (StringUtils.isEmpty(pluginDefinition.getPluginClass())) {
- throw new RuntimeException(String.format("%s should define pluginClass in plugin.yaml", jarFile.getName()));
- }
- try {
- this.loadClass(pluginDefinition.getPluginClass());
- } catch (ClassNotFoundException e) {
- throw new RuntimeException(
- String.format("pluginClass '%s' could not be found in %s", pluginDefinition.getPluginClass(),
- jarFile.getName()));
- }
+ String info = "[%s] not defined in plugin.yaml for " + jarFile.getName();
+ Preconditions.checkNotEmpty(pluginDefinition.getName(), String.format(info, "name"));
+ Preconditions.checkNotEmpty(pluginDefinition.getJavaVersion(), String.format(info, "javaVersion"));
+ Preconditions.checkNotEmpty(pluginDefinition.getPluginClasses(), String.format(info, "pluginClasses"));
if (StringUtils.isEmpty(pluginDefinition.getDescription())) {
- log.warn(String.format("%s should define description in plugin.yaml", jarFile.getName()));
+ log.warn(String.format(info, "description"));
}
- if (StringUtils.isEmpty(pluginDefinition.getJavaVersion())) {
- throw new RuntimeException(String.format("%s should define javaVersion in plugin.yaml", jarFile.getName()));
+
+ for (String clazz : pluginDefinition.getPluginClasses()) {
+ try {
+ this.loadClass(clazz);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(String.format("plugin class %s not found in %s", clazz, jarFile.getName()));
+ }
}
+
JarHell.checkJavaVersion(pluginDefinition.getName(), pluginDefinition.getJavaVersion());
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/plugin/PluginService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/plugin/PluginService.java
index 9014304cc..1e23f6f76 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/plugin/PluginService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/plugin/PluginService.java
@@ -22,9 +22,10 @@ import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.workflow.plugin.Plugin;
-import org.apache.inlong.manager.workflow.plugin.PluginBinder;
-import org.apache.inlong.manager.workflow.plugin.PluginDefinition;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.plugin.Plugin;
+import org.apache.inlong.manager.common.plugin.PluginBinder;
+import org.apache.inlong.manager.common.plugin.PluginDefinition;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
@@ -47,8 +48,7 @@ import java.util.Map;
@Order(Ordered.LOWEST_PRECEDENCE)
public class PluginService implements InitializingBean {
-
- public static final String DEFAULT_PLUGIN_LOC = "plugins";
+ public static final String DEFAULT_PLUGIN_LOCATION = "plugins";
@Getter
private final List<Plugin> plugins = new ArrayList<>();
@@ -56,7 +56,7 @@ public class PluginService implements InitializingBean {
@Setter
@Getter
@Value("${plugin.location?:}")
- private String pluginLoc;
+ private String pluginLocation;
@Getter
@Autowired
@@ -67,10 +67,10 @@ public class PluginService implements InitializingBean {
@Override
public void afterPropertiesSet() throws Exception {
- if (StringUtils.isEmpty(pluginLoc)) {
- pluginLoc = DEFAULT_PLUGIN_LOC;
+ if (StringUtils.isEmpty(pluginLocation)) {
+ pluginLocation = DEFAULT_PLUGIN_LOCATION;
}
- log.info("pluginLoc:{}", pluginLoc);
+ log.info("plugin location is {}", pluginLocation);
pluginReload();
}
@@ -78,36 +78,41 @@ public class PluginService implements InitializingBean {
* Reload the plugin from the plugin path
*/
public void pluginReload() {
- Path path = Paths.get(pluginLoc).toAbsolutePath();
+ Path path = Paths.get(pluginLocation).toAbsolutePath();
log.info("search for plugin in {}", path);
if (!path.toFile().exists()) {
log.warn("plugin directory not found");
return;
}
+
PluginClassLoader pluginLoader = PluginClassLoader.getFromPluginUrl(path.toString(),
Thread.currentThread().getContextClassLoader());
Map<String, PluginDefinition> pluginDefinitions = pluginLoader.getPluginDefinitions();
if (MapUtils.isEmpty(pluginDefinitions)) {
- log.warn("pluginDefinition not found in {}", pluginLoc);
+ log.warn("plugin definition not found in {}", pluginLocation);
return;
}
+
List<Plugin> plugins = new ArrayList<>();
for (PluginDefinition pluginDefinition : pluginDefinitions.values()) {
- String pluginClassName = pluginDefinition.getPluginClass();
- try {
- Class<?> pluginClass = pluginLoader.loadClass(pluginClassName);
- Object plugin = pluginClass.getDeclaredConstructor().newInstance();
- plugins.add((Plugin) plugin);
- } catch (Throwable e) {
- throw new RuntimeException(e.getMessage());
+ List<String> classNames = pluginDefinition.getPluginClasses();
+ for (String name : classNames) {
+ try {
+ Class<?> pluginClass = pluginLoader.loadClass(name);
+ Object plugin = pluginClass.getDeclaredConstructor().newInstance();
+ plugins.add((Plugin) plugin);
+ } catch (Throwable e) {
+ log.error("create plugin instance error: ", e);
+ throw new BusinessException("create plugin instance error: " + e.getMessage());
+ }
}
}
this.plugins.addAll(plugins);
- for (PluginBinder pluginBinder : pluginBinders) {
+
+ for (PluginBinder binder : pluginBinders) {
for (Plugin plugin : plugins) {
- log.info(String.format("PluginBinder:%s load Plugin:%s",
- pluginBinder.getClass().getSimpleName(), plugin.getClass().getSimpleName()));
- pluginBinder.acceptPlugin(plugin);
+ binder.acceptPlugin(plugin);
+ log.info("plugin {} loaded by plugin binder {}", plugin.getClass(), binder.getClass());
}
}
}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/plugin/PluginClassLoaderTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/plugin/PluginClassLoaderTest.java
index adf2891c4..007805050 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/plugin/PluginClassLoaderTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/plugin/PluginClassLoaderTest.java
@@ -18,13 +18,15 @@
package org.apache.inlong.manager.service.plugin;
import com.google.common.collect.Lists;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.workflow.plugin.Plugin;
-import org.apache.inlong.manager.workflow.plugin.PluginDefinition;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.inlong.manager.common.plugin.Plugin;
+import org.apache.inlong.manager.common.plugin.PluginDefinition;
import org.apache.inlong.manager.workflow.plugin.ProcessPlugin;
+import org.apache.inlong.manager.workflow.plugin.sort.PollerPlugin;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -33,25 +35,33 @@ import java.util.Objects;
*/
public class PluginClassLoaderTest {
+ /**
+ * The test plugin jar was packaged from manager-plugins,
+ * naming it to `manager-plugin-example.jar`.
+ */
@Test
public void testLoadPlugin() {
-
String path = Objects.requireNonNull(this.getClass().getClassLoader().getResource("")).getPath();
PluginClassLoader pluginClassLoader = PluginClassLoader.getFromPluginUrl(path + "plugins",
Thread.currentThread().getContextClassLoader());
Map<String, PluginDefinition> pluginDefinitionMap = pluginClassLoader.getPluginDefinitions();
Assertions.assertEquals(1, pluginDefinitionMap.size());
+
PluginDefinition pluginDefinition = Lists.newArrayList(pluginDefinitionMap.values()).get(0);
Assertions.assertNotNull(pluginDefinition);
- String pluginClass = pluginDefinition.getPluginClass();
- Assertions.assertTrue(StringUtils.isNotEmpty(pluginClass));
- try {
- Class cls = pluginClassLoader.loadClass(pluginClass);
- Plugin plugin = (Plugin) cls.getDeclaredConstructor().newInstance();
- Assertions.assertTrue(plugin instanceof ProcessPlugin);
- } catch (Exception e) {
- Assertions.assertTrue(e instanceof ClassNotFoundException);
- Assertions.fail();
+ List<String> classNames = pluginDefinition.getPluginClasses();
+ Assertions.assertTrue(CollectionUtils.isNotEmpty(classNames));
+
+ for (String name : classNames) {
+ try {
+ Class<?> cls = pluginClassLoader.loadClass(name);
+ Plugin plugin = (Plugin) cls.getDeclaredConstructor().newInstance();
+ Assertions.assertTrue(plugin instanceof ProcessPlugin
+ || plugin instanceof PollerPlugin);
+ } catch (Exception e) {
+ Assertions.assertTrue(e instanceof ClassNotFoundException);
+ Assertions.fail();
+ }
}
}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/plugin/PluginServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/plugin/PluginServiceTest.java
index c186e6cbd..684d74427 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/plugin/PluginServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/plugin/PluginServiceTest.java
@@ -17,17 +17,18 @@
package org.apache.inlong.manager.service.plugin;
+import org.apache.inlong.manager.common.plugin.Plugin;
import org.apache.inlong.manager.service.ServiceBaseTest;
-import org.apache.inlong.manager.workflow.plugin.Plugin;
-import org.apache.inlong.manager.workflow.plugin.ProcessPlugin;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import java.io.File;
+import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Paths;
import java.util.List;
+import java.util.Objects;
/**
* Test class for reload plugin.
@@ -43,15 +44,15 @@ public class PluginServiceTest extends ServiceBaseTest {
public void testReloadPlugin() {
String path = null;
try {
- path = Paths.get(this.getClass().getClassLoader().getResource("").toURI()).toString();
+ URI uri = Objects.requireNonNull(this.getClass().getClassLoader().getResource("")).toURI();
+ path = Paths.get(uri).toString();
} catch (URISyntaxException e) {
Assertions.fail(e.getMessage());
}
- pluginService.setPluginLoc(path + File.separator + PLUGIN_NAME);
+ pluginService.setPluginLocation(path + File.separator + PLUGIN_NAME);
pluginService.pluginReload();
List<Plugin> pluginList = pluginService.getPlugins();
Assertions.assertTrue(pluginList.size() > 0);
- Assertions.assertTrue(pluginList.get(0) instanceof ProcessPlugin);
}
}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/DisableZkForSortTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/DisableZkForSortTest.java
index bdb5a7a50..d671c9b9b 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/DisableZkForSortTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/DisableZkForSortTest.java
@@ -19,9 +19,10 @@ package org.apache.inlong.manager.service.sort;
import com.google.common.collect.Lists;
import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.consts.MQType;
import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.enums.GroupStatus;
-import org.apache.inlong.manager.common.consts.MQType;
+import org.apache.inlong.manager.common.enums.ProcessName;
import org.apache.inlong.manager.common.enums.ProcessStatus;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.sink.SinkField;
@@ -32,11 +33,10 @@ import org.apache.inlong.manager.pojo.workflow.ProcessResponse;
import org.apache.inlong.manager.pojo.workflow.WorkflowResult;
import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
-import org.apache.inlong.manager.service.stream.InlongStreamService;
import org.apache.inlong.manager.service.mocks.MockPlugin;
import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.apache.inlong.manager.service.source.StreamSourceService;
-import org.apache.inlong.manager.common.enums.ProcessName;
+import org.apache.inlong.manager.service.stream.InlongStreamService;
import org.apache.inlong.manager.service.workflow.WorkflowServiceImplTest;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.definition.ServiceTask;
@@ -57,7 +57,6 @@ import java.util.stream.Collectors;
*/
public class DisableZkForSortTest extends WorkflowServiceImplTest {
-
@Autowired
protected InlongStreamService streamService;
diff --git a/inlong-manager/manager-service/src/test/resources/application.properties b/inlong-manager/manager-service/src/test/resources/application.properties
index 1512ce7e8..b16c38c79 100644
--- a/inlong-manager/manager-service/src/test/resources/application.properties
+++ b/inlong-manager/manager-service/src/test/resources/application.properties
@@ -15,7 +15,6 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-#
# Encryption config, the suffix of value must be the same as the version.
inlong.encrypt.version=1
diff --git a/inlong-manager/manager-service/src/test/resources/plugins/manager-plugin-example.jar b/inlong-manager/manager-service/src/test/resources/plugins/manager-plugin-example.jar
index 841b1189f..fa2aaf612 100644
Binary files a/inlong-manager/manager-service/src/test/resources/plugins/manager-plugin-example.jar and b/inlong-manager/manager-service/src/test/resources/plugins/manager-plugin-example.jar differ
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongSortController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongSortController.java
index 7b0a39666..95db04f03 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongSortController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongSortController.java
@@ -20,15 +20,17 @@ package org.apache.inlong.manager.web.controller;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.apache.inlong.manager.pojo.common.Response;
-import org.apache.inlong.manager.pojo.sort.ListSortStatusRequest;
-import org.apache.inlong.manager.pojo.sort.ListSortStatusResponse;
+import org.apache.inlong.manager.pojo.sort.SortStatusRequest;
+import org.apache.inlong.manager.pojo.sort.SortStatusInfo;
import org.apache.inlong.manager.service.core.SortService;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
+import java.util.List;
+
/**
* Inlong sort controller.
*/
@@ -40,9 +42,9 @@ public class InlongSortController {
@Autowired
private SortService sortService;
- @RequestMapping(value = "/sort/listStatus", method = RequestMethod.POST)
- @ApiOperation(value = "List sort job status by inlong groups")
- public Response<ListSortStatusResponse> listSortStatus(@RequestBody ListSortStatusRequest request) {
+ @PostMapping(value = "/sort/listStatus")
+ @ApiOperation(value = "List sort job status")
+ public Response<List<SortStatusInfo>> listSortStatus(@RequestBody SortStatusRequest request) {
return Response.success(sortService.listSortStatus(request));
}
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/ProcessPlugin.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/ProcessPlugin.java
index efbae225b..7e15d5276 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/ProcessPlugin.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/ProcessPlugin.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.workflow.plugin;
+import org.apache.inlong.manager.common.plugin.Plugin;
import org.apache.inlong.manager.workflow.event.task.QueueOperateListener;
import org.apache.inlong.manager.workflow.event.task.SinkOperateListener;
import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
@@ -45,8 +46,4 @@ public interface ProcessPlugin extends Plugin {
return null;
}
- default SortStatusPoller createSortStatusPoller() {
- return null;
- }
-
}
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/SortStatusPoller.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/sort/PollerPlugin.java
similarity index 67%
copy from inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/SortStatusPoller.java
copy to inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/sort/PollerPlugin.java
index 295fbcff6..a59ed0060 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/SortStatusPoller.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/sort/PollerPlugin.java
@@ -15,17 +15,22 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.workflow.plugin;
+package org.apache.inlong.manager.workflow.plugin.sort;
-import org.apache.inlong.manager.common.enums.SortStatus;
-import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
-
-import java.util.List;
-import java.util.Map;
+import org.apache.inlong.manager.common.plugin.Plugin;
/**
- * Sort task status poller interface for inlong groups
+ * Interface of Poller Plugin.
*/
-public interface SortStatusPoller {
- Map<String, SortStatus> poll(List<InlongGroupInfo> groupInfo, String credentials);
+public interface PollerPlugin extends Plugin {
+
+ /**
+ * Get Sort poller instance.
+ *
+ * @return Sort poller instance
+ */
+ default SortPoller getSortPoller() {
+ return null;
+ }
+
}
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/SortStatusPoller.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/sort/SortPoller.java
similarity index 61%
rename from inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/SortStatusPoller.java
rename to inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/sort/SortPoller.java
index 295fbcff6..68dd24285 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/SortStatusPoller.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/plugin/sort/SortPoller.java
@@ -15,17 +15,26 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.workflow.plugin;
+package org.apache.inlong.manager.workflow.plugin.sort;
-import org.apache.inlong.manager.common.enums.SortStatus;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.pojo.sort.SortStatusInfo;
import java.util.List;
-import java.util.Map;
/**
- * Sort task status poller interface for inlong groups
+ * Plugin poller interface for extension
*/
-public interface SortStatusPoller {
- Map<String, SortStatus> poll(List<InlongGroupInfo> groupInfo, String credentials);
+public interface SortPoller {
+
+ /**
+ * Poll the Sort status infos by the given inlong groups
+ *
+ * @param groupInfos inlong group infos
+ * @param credentials credential info
+ * @return list of Sort status infos
+ * @throws Exception any exception if occurred
+ */
+ List<SortStatusInfo> pollSortStatus(List<InlongGroupInfo> groupInfos, String credentials) throws Exception;
+
}