You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2023/01/31 02:04:08 UTC

[inlong] branch master updated: [INLONG-7294][Manager] Fix the problem of suspend, restart, delete sort task failed (#7295)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new a675d88a3 [INLONG-7294][Manager] Fix the problem of suspend, restart, delete sort task failed (#7295)
a675d88a3 is described below

commit a675d88a34aae7864047477b510339838eb400b6
Author: fuweng11 <76...@users.noreply.github.com>
AuthorDate: Tue Jan 31 10:04:01 2023 +0800

    [INLONG-7294][Manager] Fix the problem of suspend, restart, delete sort task failed (#7295)
---
 .../manager/plugin/listener/DeleteSortListener.java       | 12 +++++-------
 .../manager/plugin/listener/DeleteStreamListener.java     | 13 +++++--------
 .../manager/plugin/listener/RestartSortListener.java      | 14 +++++---------
 .../manager/plugin/listener/RestartStreamListener.java    | 15 +++++----------
 .../manager/plugin/listener/SuspendSortListener.java      | 14 +++++---------
 .../manager/plugin/listener/SuspendStreamListener.java    | 15 +++++----------
 6 files changed, 30 insertions(+), 53 deletions(-)

diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
index 7eae0e9ca..d898d5d83 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
@@ -89,15 +89,13 @@ public class DeleteSortListener implements SortOperateListener {
         Map<String, String> kvConf = new HashMap<>();
         extList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue()));
         String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
-        if (StringUtils.isEmpty(sortExt)) {
-            log.warn("no need to delete sort for groupId={}, as the sort properties is empty", groupId);
-            return ListenerResult.success();
+        if (StringUtils.isNotEmpty(sortExt)) {
+            Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
+                    JsonUtils.OBJECT_MAPPER.readTree(sortExt), new TypeReference<Map<String, String>>() {
+                    });
+            kvConf.putAll(result);
         }
 
-        Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
-                JsonUtils.OBJECT_MAPPER.readTree(sortExt), new TypeReference<Map<String, String>>() {
-                });
-        kvConf.putAll(result);
         String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
         if (StringUtils.isBlank(jobId)) {
             String message = String.format("sort job id is empty for groupId=%s", groupId);
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
index 64006410b..54d4f23b6 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
@@ -94,16 +94,13 @@ public class DeleteStreamListener implements SortOperateListener {
         final String groupId = streamInfo.getInlongGroupId();
         final String streamId = streamInfo.getInlongStreamId();
         String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
-        if (StringUtils.isEmpty(sortExt)) {
-            log.warn("no need to delete sort for groupId={} streamId={}, as the sort properties is empty",
-                    groupId, streamId);
-            return ListenerResult.success();
+        if (StringUtils.isNotEmpty(sortExt)) {
+            Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
+                    JsonUtils.OBJECT_MAPPER.readTree(sortExt), new TypeReference<Map<String, String>>() {
+                    });
+            kvConf.putAll(result);
         }
 
-        Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
-                JsonUtils.OBJECT_MAPPER.readTree(sortExt), new TypeReference<Map<String, String>>() {
-                });
-        kvConf.putAll(result);
         String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
         if (StringUtils.isBlank(jobId)) {
             String message = String.format("sort job id is empty for groupId=%s streamId=%s", groupId, streamId);
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
index 0f1fbe4ec..57238a33c 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
@@ -90,17 +90,13 @@ public class RestartSortListener implements SortOperateListener {
         Map<String, String> kvConf = new HashMap<>();
         extList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue()));
         String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
-        if (StringUtils.isEmpty(sortExt)) {
-            String message = String.format("restart sort failed for groupId [%s], as the sort properties is empty",
-                    groupId);
-            log.error(message);
-            return ListenerResult.fail(message);
+        if (StringUtils.isNotEmpty(sortExt)) {
+            Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
+                    JsonUtils.OBJECT_MAPPER.readTree(sortExt), new TypeReference<Map<String, String>>() {
+                    });
+            kvConf.putAll(result);
         }
 
-        Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
-                JsonUtils.OBJECT_MAPPER.readTree(sortExt), new TypeReference<Map<String, String>>() {
-                });
-        kvConf.putAll(result);
         String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
         if (StringUtils.isBlank(jobId)) {
             String message = String.format("sort job id is empty for groupId [%s]", groupId);
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
index 5ca86cc53..322854476 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
@@ -99,18 +99,13 @@ public class RestartStreamListener implements SortOperateListener {
         final String groupId = streamInfo.getInlongGroupId();
         final String streamId = streamInfo.getInlongStreamId();
         String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
-        if (StringUtils.isEmpty(sortExt)) {
-            String message = String.format(
-                    "restart sort failed for groupId [%s] and streamId [%s], as the sort properties is empty",
-                    groupId, streamId);
-            log.error(message);
-            return ListenerResult.fail(message);
+        if (StringUtils.isNotEmpty(sortExt)) {
+            Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
+                    JsonUtils.OBJECT_MAPPER.readTree(sortExt), new TypeReference<Map<String, String>>() {
+                    });
+            kvConf.putAll(result);
         }
 
-        Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
-                JsonUtils.OBJECT_MAPPER.readTree(sortExt), new TypeReference<Map<String, String>>() {
-                });
-        kvConf.putAll(result);
         String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
         if (StringUtils.isBlank(jobId)) {
             String message = String.format("sort job id is empty for groupId [%s] streamId [%s]", groupId, streamId);
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
index d8a17d2d5..fd68e364c 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
@@ -89,17 +89,13 @@ public class SuspendSortListener implements SortOperateListener {
         Map<String, String> kvConf = new HashMap<>();
         extList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue()));
         String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
-        if (StringUtils.isEmpty(sortExt)) {
-            String message = String.format("suspend sort failed for groupId [%s], as the sort properties is empty",
-                    groupId);
-            log.error(message);
-            return ListenerResult.fail(message);
+        if (StringUtils.isNotEmpty(sortExt)) {
+            Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
+                    JsonUtils.OBJECT_MAPPER.readTree(sortExt), new TypeReference<Map<String, String>>() {
+                    });
+            kvConf.putAll(result);
         }
 
-        Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
-                JsonUtils.OBJECT_MAPPER.readTree(sortExt), new TypeReference<Map<String, String>>() {
-                });
-        kvConf.putAll(result);
         String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
         if (StringUtils.isBlank(jobId)) {
             String message = String.format("sort job id is empty for groupId [%s]", groupId);
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java
index 672590d67..b3f7821c1 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java
@@ -95,18 +95,13 @@ public class SuspendStreamListener implements SortOperateListener {
         final String groupId = streamInfo.getInlongGroupId();
         final String streamId = streamInfo.getInlongStreamId();
         String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
-        if (StringUtils.isEmpty(sortExt)) {
-            String message = String.format(
-                    "suspend sort failed for groupId [%s] streamId [%s], as the sort properties is empty",
-                    groupId, streamId);
-            log.error(message);
-            return ListenerResult.fail(message);
+        if (StringUtils.isNotEmpty(sortExt)) {
+            Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
+                    JsonUtils.OBJECT_MAPPER.readTree(sortExt), new TypeReference<Map<String, String>>() {
+                    });
+            kvConf.putAll(result);
         }
 
-        Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
-                JsonUtils.OBJECT_MAPPER.readTree(sortExt), new TypeReference<Map<String, String>>() {
-                });
-        kvConf.putAll(result);
         String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
         if (StringUtils.isBlank(jobId)) {
             String message = String.format("sort job id is empty for groupId [%s] streamId [%s]", groupId, streamId);