You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/08/26 05:03:21 UTC

[inlong] branch release-1.3.0 updated (9fe9c54b1 -> 555d2ddbe)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a change to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git


    from 9fe9c54b1 [INLONG-5290][Manager] Optimize the objects returned by paging queries (#5558)
     new 7779926f8 [INLONG-5039][Manager] Support delete and update command, and query execution logs for manager client (#5519)
     new e625854f1 [INLONG-5696][Dashboard] EditableTable value update error when columns change (#5697)
     new 24bb64664 [INLONG-5698][Manager] Fixed the DataProxy cluster tag was restored after the manager was restarted (#5699)
     new edc0ae219 [INLONG-5689][Manager] PulsarSource set fieldDelimiter when use CSV format (#5690)
     new 62fd18f8e [INLONG-5705][Manager] Fix the deprecated dependency of PageInfo (#5704)
     new b6151c682 [INLONG-5694][Manager] Fix the problem that gets the inlong group error (#5695)
     new 9b9c3b34b [INLONG-5703][Manager] Add separator-related fields for some sources (#5706)
     new 555d2ddbe [INLONG-5700][TubeMQ] Core file generated while the C++ consumer is closed (#5707)

The 8 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../src/components/EditableTable/index.tsx         | 10 +--
 .../inlong/manager/client/cli/CommandToolMain.java |  5 +-
 .../inlong/manager/client/cli/CreateCommand.java   | 23 ++++--
 .../inlong/manager/client/cli/DeleteCommand.java   | 66 ++++++++++++++++
 .../inlong/manager/client/cli/ListCommand.java     |  2 +-
 .../inlong/manager/client/cli/LogCommand.java      | 87 ++++++++++++++++++++++
 .../cli/{CreateCommand.java => UpdateCommand.java} | 56 +++++++-------
 .../inlong/manager/client/cli/pojo/GroupInfo.java  |  2 +
 .../inlong/manager/client/cli/pojo/SinkInfo.java   |  2 +
 .../inlong/manager/client/cli/pojo/SourceInfo.java |  2 +
 .../inlong/manager/client/cli/pojo/StreamInfo.java |  2 +
 .../manager/client/cli/util/ClientUtils.java       |  5 +-
 .../inlong/manager/client/cli/TestCommand.java     | 25 ++++++-
 .../src/test/resources/create_group.json           | 29 +++-----
 .../src/test/resources/test_config.json            |  9 +++
 .../dao/mapper/InlongClusterEntityMapper.java      |  2 +
 .../mappers/InlongClusterEntityMapper.xml          | 15 ++++
 .../plugin/listener/DeleteSortListener.java        | 12 +--
 .../plugin/listener/DeleteStreamListener.java      | 12 +--
 .../plugin/listener/RestartSortListener.java       | 14 ++--
 .../plugin/listener/RestartStreamListener.java     | 14 ++--
 .../plugin/listener/StartupStreamListener.java     | 14 ++--
 .../plugin/listener/SuspendSortListener.java       | 12 +--
 .../plugin/listener/SuspendStreamListener.java     | 12 +--
 .../manager/pojo/sort/util/ExtractNodeUtils.java   |  5 +-
 .../pojo/source/autopush/AutoPushSource.java       |  9 +++
 .../pojo/source/autopush/AutoPushSourceDTO.java    |  9 +++
 .../source/autopush/AutoPushSourceRequest.java     | 11 +++
 .../manager/pojo/source/kafka/KafkaSource.java     |  9 +++
 .../manager/pojo/source/kafka/KafkaSourceDTO.java  |  9 +++
 .../pojo/source/kafka/KafkaSourceRequest.java      | 11 +++
 .../manager/pojo/source/pulsar/PulsarSource.java   |  9 +++
 .../pojo/source/pulsar/PulsarSourceDTO.java        |  9 +++
 .../pojo/source/pulsar/PulsarSourceRequest.java    | 11 +++
 .../service/core/heartbeat/HeartbeatManager.java   | 11 +--
 .../service/core/impl/ConsumptionServiceImpl.java  |  8 +-
 .../service/core/impl/SortSourceServiceImpl.java   | 17 +++--
 .../service/group/InlongGroupServiceImpl.java      |  5 +-
 .../source/pulsar/PulsarSourceOperator.java        |  7 ++
 .../core/impl/WorkflowQueryServiceImpl.java        |  5 +-
 .../sort/protocol/node/format/CsvFormat.java       |  5 ++
 .../tubemq-client-cpp/src/baseconsumer.cc          | 40 ++++++++--
 42 files changed, 474 insertions(+), 148 deletions(-)
 create mode 100644 inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/DeleteCommand.java
 create mode 100644 inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/LogCommand.java
 copy inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/{CreateCommand.java => UpdateCommand.java} (50%)
 create mode 100644 inlong-manager/manager-client-tools/src/test/resources/test_config.json


[inlong] 01/08: [INLONG-5039][Manager] Support delete and update command, and query execution logs for manager client (#5519)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 7779926f81019ba27104e72f6632fbdbc0828d43
Author: Yizhou Yang <32...@users.noreply.github.com>
AuthorDate: Thu Aug 25 17:05:37 2022 +0800

    [INLONG-5039][Manager] Support delete and update command, and query execution logs for manager client (#5519)
---
 .../inlong/manager/client/cli/CommandToolMain.java |  5 +-
 .../inlong/manager/client/cli/CreateCommand.java   | 23 ++++--
 .../inlong/manager/client/cli/DeleteCommand.java   | 66 ++++++++++++++++
 .../inlong/manager/client/cli/LogCommand.java      | 87 ++++++++++++++++++++++
 .../cli/{CreateCommand.java => UpdateCommand.java} | 56 +++++++-------
 .../inlong/manager/client/cli/pojo/GroupInfo.java  |  2 +
 .../inlong/manager/client/cli/pojo/SinkInfo.java   |  2 +
 .../inlong/manager/client/cli/pojo/SourceInfo.java |  2 +
 .../inlong/manager/client/cli/pojo/StreamInfo.java |  2 +
 .../manager/client/cli/util/ClientUtils.java       |  5 +-
 .../inlong/manager/client/cli/TestCommand.java     | 25 ++++++-
 .../src/test/resources/create_group.json           | 29 +++-----
 .../src/test/resources/test_config.json            |  9 +++
 13 files changed, 249 insertions(+), 64 deletions(-)

diff --git a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CommandToolMain.java b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CommandToolMain.java
index 271016a4f..e8da1bb42 100644
--- a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CommandToolMain.java
+++ b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CommandToolMain.java
@@ -28,7 +28,6 @@ import java.util.Arrays;
 public class CommandToolMain {
 
     private final JCommander jcommander;
-
     @Parameter(names = {"-h", "--help"}, help = true, description = "Get all command about managerctl.")
     boolean help;
 
@@ -36,10 +35,12 @@ public class CommandToolMain {
         jcommander = new JCommander();
         jcommander.setProgramName("managerctl");
         jcommander.addObject(this);
-
         jcommander.addCommand("list", new ListCommand());
         jcommander.addCommand("describe", new DescribeCommand());
         jcommander.addCommand("create", new CreateCommand());
+        jcommander.addCommand("delete", new DeleteCommand());
+        jcommander.addCommand("update", new UpdateCommand());
+        jcommander.addCommand("log", new LogCommand());
     }
 
     public static void main(String[] args) {
diff --git a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CreateCommand.java b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CreateCommand.java
index 0967c66d0..2a099f958 100644
--- a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CreateCommand.java
+++ b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CreateCommand.java
@@ -51,26 +51,37 @@ public class CreateCommand extends AbstractCommand {
 
         @Parameter(names = {"-f", "--file"},
                 converter = FileConverter.class,
-                required = true,
                 description = "json file")
         private File file;
 
+        @Parameter(names = {"-s"}, description = "optional log string to create file")
+        private String input;
+
         @Override
         void run() {
             try {
-                String fileContent = ClientUtils.readFile(file);
-                if (StringUtils.isBlank(fileContent)) {
-                    System.out.println("Create group failed: file was empty!");
-                    return;
+                String content;
+                if (input != null) {
+                    content = input;
+                } else {
+                    content = ClientUtils.readFile(file);
+                    if (StringUtils.isBlank(content)) {
+                        System.out.println("Create group failed: file was empty!");
+                        return;
+                    }
                 }
-                CreateGroupConf groupConf = objectMapper.readValue(fileContent, CreateGroupConf.class);
+                // first extract groupconfig from the file passed in
+                CreateGroupConf groupConf = objectMapper.readValue(content, CreateGroupConf.class);
+                // get the correspodning inlonggroup, a.k.a the task to execute
                 InlongClient inlongClient = ClientUtils.getClient();
                 InlongGroup group = inlongClient.forGroup(groupConf.getGroupInfo());
                 InlongStreamBuilder streamBuilder = group.createStream(groupConf.getStreamInfo());
+                // put in parameters:source and sink,stream fields, then initialize
                 streamBuilder.fields(groupConf.getStreamFieldList());
                 streamBuilder.source(groupConf.getStreamSource());
                 streamBuilder.sink(groupConf.getStreamSink());
                 streamBuilder.initOrUpdate();
+                // initialize the new stream group
                 group.init();
                 System.out.println("Create group success!");
             } catch (Exception e) {
diff --git a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/DeleteCommand.java b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/DeleteCommand.java
new file mode 100644
index 000000000..e7746deb2
--- /dev/null
+++ b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/DeleteCommand.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.cli;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import org.apache.inlong.manager.client.api.InlongClient;
+import org.apache.inlong.manager.client.api.InlongGroup;
+import org.apache.inlong.manager.client.cli.util.ClientUtils;
+
+import java.util.List;
+
+/**
+ * The delete command used for deleting inlong group instances.
+ * Please refer to the document for parameters
+ */
+@Parameters(commandDescription = "Delete resource by json file")
+public class DeleteCommand extends AbstractCommand {
+
+    @Parameter()
+    private List<String> params;
+
+    public DeleteCommand() {
+        super("delete");
+        jcommander.addCommand("group", new DeleteCommand.DeleteGroup());
+    }
+
+    @Parameters(commandDescription = "Delete group by group id")
+    private static class DeleteGroup extends AbstractCommandRunner {
+
+        @Parameter()
+        private List<String> params;
+
+        @Parameter(names = {"--group", "-g"}, required = true, description = "inlong group id")
+        private String inlongGroupId;
+
+        @Override
+        void run() {
+            // get the group and the corresponding context(snapshot)
+            // TODO: handle and/or classify the exceptions
+            try {
+                InlongClient inlongClient = ClientUtils.getClient();
+                InlongGroup group = inlongClient.getGroup(inlongGroupId);
+                group.delete();
+                System.out.println("delete group success");
+            } catch (Exception e) {
+                System.out.format("Delete group failed! message: %s \n", e.getMessage());
+            }
+        }
+    }
+}
diff --git a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/LogCommand.java b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/LogCommand.java
new file mode 100644
index 000000000..dd6a8c15a
--- /dev/null
+++ b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/LogCommand.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.cli;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.github.pagehelper.PageInfo;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.client.api.inner.client.InlongGroupClient;
+import org.apache.inlong.manager.client.cli.pojo.GroupInfo;
+import org.apache.inlong.manager.client.cli.util.ClientUtils;
+import org.apache.inlong.manager.client.cli.util.PrintUtils;
+import org.apache.inlong.manager.pojo.group.InlongGroupBriefInfo;
+import org.apache.inlong.manager.pojo.group.InlongGroupPageRequest;
+
+import java.util.List;
+
+/**
+ * The log command was used to get log info for specified inlong groups.
+ */
+@Parameters(commandDescription = "Log resource")
+public class LogCommand extends AbstractCommand {
+
+    @Parameter()
+    private List<String> params;
+
+    public LogCommand() {
+        super("log");
+        jcommander.addCommand("group", new CreateGroup());
+    }
+
+    @Parameters(commandDescription = "Log group")
+    private static class CreateGroup extends AbstractCommandRunner {
+
+        @Parameter()
+        private List<String> params;
+
+        @Parameter(names = {"--query"}, required = true, description = "condition filters")
+        private String input;
+
+        @Override
+        void run() {
+            final int MAX_LOG_SIZE = 100;
+            try {
+                // for now only filter by one condition. TODO:support OR and AND, make a condition filter.
+                // sample input: inlongGroupId:test_group
+                if (StringUtils.isNotBlank(input)) {
+                    System.err.println("input cannot be empty, for example: inlongGroupId:test_group");
+                    return;
+                }
+                String[] inputs = input.split(":");
+                if (inputs.length < 2 || StringUtils.isBlank(inputs[1])) {
+                    System.err.println("the input must contain ':'");
+                    return;
+                }
+
+                ClientUtils.initClientFactory();
+                InlongGroupClient groupClient = ClientUtils.clientFactory.getGroupClient();
+                InlongGroupPageRequest pageRequest = new InlongGroupPageRequest();
+                pageRequest.setKeyword(inputs[1]);
+                PageInfo<InlongGroupBriefInfo> pageInfo = groupClient.listGroups(pageRequest);
+                if (pageInfo.getSize() > MAX_LOG_SIZE) {
+                    System.err.println("the log is too large to print, please change the filter condition");
+                    return;
+                }
+                PrintUtils.print(pageInfo.getList(), GroupInfo.class);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+}
diff --git a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CreateCommand.java b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/UpdateCommand.java
similarity index 50%
copy from inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CreateCommand.java
copy to inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/UpdateCommand.java
index 0967c66d0..6e7ad8ec5 100644
--- a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CreateCommand.java
+++ b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/UpdateCommand.java
@@ -19,63 +19,59 @@ package org.apache.inlong.manager.client.cli;
 
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.Parameters;
-import com.beust.jcommander.converters.FileConverter;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.client.api.InlongClient;
 import org.apache.inlong.manager.client.api.InlongGroup;
-import org.apache.inlong.manager.client.api.InlongStreamBuilder;
-import org.apache.inlong.manager.client.cli.pojo.CreateGroupConf;
 import org.apache.inlong.manager.client.cli.util.ClientUtils;
+import org.apache.inlong.manager.pojo.sort.BaseSortConf;
 
 import java.io.File;
+import java.util.List;
 
 /**
- * Create resource by json file.
+ * The update command used to change the fields of inlong groups.
+ * Please refer to the document for parameters
  */
-@Parameters(commandDescription = "Create resource by json file")
-public class CreateCommand extends AbstractCommand {
+@Parameters(commandDescription = "Update resource by json file")
+public class UpdateCommand extends AbstractCommand {
 
     @Parameter()
-    private java.util.List<String> params;
+    private List<String> params;
 
-    public CreateCommand() {
-        super("create");
-        jcommander.addCommand("group", new CreateGroup());
+    public UpdateCommand() {
+        super("update");
+        jcommander.addCommand("group", new UpdateCommand.UpdateGroup());
     }
 
-    @Parameters(commandDescription = "Create group by json file")
-    private static class CreateGroup extends AbstractCommandRunner {
+    @Parameters(commandDescription = "Update group by json file")
+    private static class UpdateGroup extends AbstractCommandRunner {
 
         @Parameter()
-        private java.util.List<String> params;
+        private List<String> params;
 
-        @Parameter(names = {"-f", "--file"},
-                converter = FileConverter.class,
-                required = true,
-                description = "json file")
+        @Parameter(names = {"--group", "-g"}, required = true, description = "inlong group id")
+        private String inlongGroupId;
+
+        @Parameter(names = {"-c", "--config"},
+                required = true, description = "json file")
         private File file;
 
         @Override
         void run() {
             try {
+                InlongClient inlongClient = ClientUtils.getClient();
+                InlongGroup group = inlongClient.getGroup(inlongGroupId);
                 String fileContent = ClientUtils.readFile(file);
                 if (StringUtils.isBlank(fileContent)) {
-                    System.out.println("Create group failed: file was empty!");
+                    System.out.println("Update group failed: file was empty!");
                     return;
                 }
-                CreateGroupConf groupConf = objectMapper.readValue(fileContent, CreateGroupConf.class);
-                InlongClient inlongClient = ClientUtils.getClient();
-                InlongGroup group = inlongClient.forGroup(groupConf.getGroupInfo());
-                InlongStreamBuilder streamBuilder = group.createStream(groupConf.getStreamInfo());
-                streamBuilder.fields(groupConf.getStreamFieldList());
-                streamBuilder.source(groupConf.getStreamSource());
-                streamBuilder.sink(groupConf.getStreamSink());
-                streamBuilder.initOrUpdate();
-                group.init();
-                System.out.println("Create group success!");
+                // first extract groupconfig from the file passed in
+                BaseSortConf sortConf = objectMapper.readValue(fileContent, BaseSortConf.class);
+                group.update(sortConf);
+                System.out.println("update group success");
             } catch (Exception e) {
-                System.out.println("Create group failed!");
-                System.out.println(e.getMessage());
+                e.printStackTrace();
             }
         }
     }
diff --git a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/pojo/GroupInfo.java b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/pojo/GroupInfo.java
index 27f4b4b0f..a85322955 100644
--- a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/pojo/GroupInfo.java
+++ b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/pojo/GroupInfo.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.client.cli.pojo;
 
+import com.fasterxml.jackson.annotation.JsonFormat;
 import lombok.Data;
 import org.apache.inlong.manager.client.cli.util.ParseStatus;
 
@@ -34,6 +35,7 @@ public class GroupInfo {
 
     @ParseStatus
     private String status;
+    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
     private Date modifyTime;
 
 }
diff --git a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/pojo/SinkInfo.java b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/pojo/SinkInfo.java
index 2804be3f1..a714eefc2 100644
--- a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/pojo/SinkInfo.java
+++ b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/pojo/SinkInfo.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.client.cli.pojo;
 
+import com.fasterxml.jackson.annotation.JsonFormat;
 import lombok.Data;
 import org.apache.inlong.manager.client.cli.util.ParseStatus;
 
@@ -36,5 +37,6 @@ public class SinkInfo {
 
     @ParseStatus
     private String status;
+    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
     private Date modifyTime;
 }
diff --git a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/pojo/SourceInfo.java b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/pojo/SourceInfo.java
index ebe62ae37..131bc95d3 100644
--- a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/pojo/SourceInfo.java
+++ b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/pojo/SourceInfo.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.client.cli.pojo;
 
+import com.fasterxml.jackson.annotation.JsonFormat;
 import lombok.Data;
 import org.apache.inlong.manager.client.cli.util.ParseStatus;
 
@@ -37,5 +38,6 @@ public class SourceInfo {
 
     @ParseStatus
     private String status;
+    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
     private Date modifyTime;
 }
diff --git a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/pojo/StreamInfo.java b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/pojo/StreamInfo.java
index df48dcfab..4c9292d0b 100644
--- a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/pojo/StreamInfo.java
+++ b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/pojo/StreamInfo.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.client.cli.pojo;
 
+import com.fasterxml.jackson.annotation.JsonFormat;
 import lombok.Data;
 import org.apache.inlong.manager.client.cli.util.ParseStatus;
 
@@ -37,5 +38,6 @@ public class StreamInfo {
 
     @ParseStatus
     private String status;
+    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
     private Date modifyTime;
 }
diff --git a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/ClientUtils.java b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/ClientUtils.java
index 5ade5e425..133113b28 100644
--- a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/ClientUtils.java
+++ b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/ClientUtils.java
@@ -38,13 +38,10 @@ import java.util.Properties;
 public class ClientUtils {
 
     private static final String CONFIG_FILE = "application.properties";
-
+    public static ClientFactory clientFactory;
     private static ClientConfiguration configuration;
-
     private static String serviceUrl;
 
-    public static ClientFactory clientFactory;
-
     /**
      * Get an inlong client instance.
      */
diff --git a/inlong-manager/manager-client-tools/src/test/java/org/apache/inlong/manager/client/cli/TestCommand.java b/inlong-manager/manager-client-tools/src/test/java/org/apache/inlong/manager/client/cli/TestCommand.java
index 79e198aba..3f6f3cbe0 100644
--- a/inlong-manager/manager-client-tools/src/test/java/org/apache/inlong/manager/client/cli/TestCommand.java
+++ b/inlong-manager/manager-client-tools/src/test/java/org/apache/inlong/manager/client/cli/TestCommand.java
@@ -34,21 +34,40 @@ public class TestCommand {
         log.info("client tools cannot run the unit tests, as the application.properties not exist");
     }
 
-    // @Test
+    @Test
     public void testListGroup() {
         String[] arg = {"list", "group"};
         Assertions.assertTrue(inlongAdminTool.run(arg));
     }
 
-    // @Test
+    @Test
     public void testDescribeGroup() {
         String[] arg = {"describe", "group", "-g", "test", "-s", "130"};
         Assertions.assertTrue(inlongAdminTool.run(arg));
     }
 
-    // @Test
+    @Test
     public void testCreateGroup() {
         String[] arg = {"create", "group", "-f", "src/test/resources/create_group.json"};
         Assertions.assertTrue(inlongAdminTool.run(arg));
     }
+
+    @Test
+    public void testDeleteGroup() throws Exception {
+        String[] arg = {"delete", "group", "-g", "test_group"};
+        Assertions.assertTrue(inlongAdminTool.run(arg));
+    }
+
+    @Test
+    public void testUpdateGroup() throws Exception {
+        String[] arg = {"update", "group", "-g", "test_group", "-c", "src/test/resources/test_config.json"};
+        Assertions.assertTrue(inlongAdminTool.run(arg));
+    }
+
+    @Test
+    public void testLogGroup() throws Exception {
+        String[] arg = {"log", "group", "--query", "inlongGroupId:test_group"};
+        Assertions.assertTrue(inlongAdminTool.run(arg));
+    }
+
 }
diff --git a/inlong-manager/manager-client-tools/src/test/resources/create_group.json b/inlong-manager/manager-client-tools/src/test/resources/create_group.json
index e91ed573c..c9f2b0395 100644
--- a/inlong-manager/manager-client-tools/src/test/resources/create_group.json
+++ b/inlong-manager/manager-client-tools/src/test/resources/create_group.json
@@ -13,11 +13,7 @@
     "retentionTimeUnit": "hours",
     "retentionSize": -1,
     "retentionSizeUnit": "MB",
-    "inlongClusterTag": "default_cluster",
-    "sortConf": {
-      "sortType": "flink",
-      "serviceUrl": "127.0.0.1"
-    }
+    "inlongClusterTag": "default_cluster"
   },
   "streamInfo": {
     "dataType": "CSV",
@@ -41,21 +37,16 @@
   "streamSource": {
     "inlongStreamId": "test_stream",
     "sourceName": "source_name",
-    "hostname": "127.0.0.1",
-    "port": 3306,
-    "user": "root",
-    "password": "123456",
-    "historyFilename": "/data/inlong-agent/.history",
-    "serverTimezone": "UTC",
-    "intervalMs": 1000,
-    "allMigration": false,
-    "databaseWhiteList": "etl",
-    "tableWhiteList": "person",
     "inlongGroupId": "test_group",
-    "sourceType": "BINLOG",
-    "specificOffsetFile": "mysql-bin.000001",
-    "specificOffsetPos": 6972,
-    "snapshotMode": "schema_only_recovery"
+    "sourceType": "FILE",
+    "agentIp": "127.0.0.1",
+    "uuid": "127.0.0.1",
+    "clusterId": "123",
+    "inlongClusterName": "hi",
+    "dataNodeName": "hi2",
+    "serializationType": "csv",
+    "snapshot": "hi3",
+    "properties": {}
   },
   "streamSink": {
     "inlongStreamId": "test_stream",
diff --git a/inlong-manager/manager-client-tools/src/test/resources/test_config.json b/inlong-manager/manager-client-tools/src/test/resources/test_config.json
new file mode 100644
index 000000000..56fc72a19
--- /dev/null
+++ b/inlong-manager/manager-client-tools/src/test/resources/test_config.json
@@ -0,0 +1,9 @@
+{
+  "FlinkSortConf": {
+    "sortType": "flink",
+    "authentication": "NONE",
+    "serviceUrl": "127.0.0.1:8123",
+    "region": "beijing",
+    "properties": {}
+  }
+}


[inlong] 04/08: [INLONG-5689][Manager] PulsarSource set fieldDelimiter when use CSV format (#5690)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit edc0ae2190dddea89301a5001051d70bb1ac5014
Author: emhui <11...@users.noreply.github.com>
AuthorDate: Thu Aug 25 17:39:38 2022 +0800

    [INLONG-5689][Manager] PulsarSource set fieldDelimiter when use CSV format (#5690)
---
 .../org/apache/inlong/manager/common/consts/InlongConstants.java     | 5 +++++
 .../org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java   | 5 ++++-
 .../inlong/manager/service/source/pulsar/PulsarSourceOperator.java   | 4 ++++
 .../java/org/apache/inlong/sort/protocol/node/format/CsvFormat.java  | 5 +++++
 4 files changed, 18 insertions(+), 1 deletion(-)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
index 28c2435d2..5dc922009 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
@@ -110,4 +110,9 @@ public class InlongConstants {
 
     public static final String SORT_PROPERTIES = "sort.properties";
 
+    /**
+     * common config
+     */
+    public static final String FIELD_DELIMITER = "fieldDelimiter";
+
 }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
index 63ee044b6..626db78c6 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
@@ -23,6 +23,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.common.enums.DataTypeEnum;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.consts.SourceType;
 import org.apache.inlong.manager.pojo.source.StreamSource;
 import org.apache.inlong.manager.pojo.source.kafka.KafkaOffset;
@@ -231,7 +232,9 @@ public class ExtractNodeUtils {
         DataTypeEnum dataType = DataTypeEnum.forName(pulsarSource.getSerializationType());
         switch (dataType) {
             case CSV:
-                format = new CsvFormat();
+                String fieldDelimiter = (String) pulsarSource.getProperties()
+                        .get(InlongConstants.FIELD_DELIMITER);
+                format = StringUtils.isBlank(fieldDelimiter) ? new CsvFormat() : new CsvFormat(fieldDelimiter);
                 break;
             case AVRO:
                 format = new AvroFormat();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
index 03fb7f96b..1bc490fda 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
@@ -151,6 +151,10 @@ public class PulsarSourceOperator extends AbstractSourceOperator {
             if (StringUtils.isEmpty(pulsarSource.getSerializationType())) {
                 pulsarSource.setSerializationType(DataTypeEnum.CSV.getName());
             }
+            if (DataTypeEnum.CSV.getName().equalsIgnoreCase(pulsarSource.getSerializationType())) {
+                Map<String, Object> properties = pulsarSource.getProperties();
+                properties.put(InlongConstants.FIELD_DELIMITER, streamInfo.getDataSeparator());
+            }
             pulsarSource.setScanStartupMode(PulsarScanStartupMode.EARLIEST.getValue());
             pulsarSource.setFieldList(streamInfo.getFieldList());
             sourceMap.computeIfAbsent(streamId, key -> Lists.newArrayList()).add(pulsarSource);
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/CsvFormat.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/CsvFormat.java
index 60cfba67b..ae8aa763c 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/CsvFormat.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/CsvFormat.java
@@ -79,6 +79,11 @@ public class CsvFormat implements Format {
         this(",", true, null, false, true, ";", null, null);
     }
 
+    @JsonCreator
+    public CsvFormat(String fieldDelimiter) {
+        this(fieldDelimiter, true, null, false, true, ";", null, null);
+    }
+
     @JsonIgnore
     @Override
     public String getFormat() {


[inlong] 05/08: [INLONG-5705][Manager] Fix the deprecated dependency of PageInfo (#5704)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 62fd18f8e4fd5fd8c635dcaa24d1d0c7097c2141
Author: Yizhou Yang <32...@users.noreply.github.com>
AuthorDate: Thu Aug 25 20:32:21 2022 +0800

    [INLONG-5705][Manager] Fix the deprecated dependency of PageInfo (#5704)
---
 .../java/org/apache/inlong/manager/client/cli/ListCommand.java    | 2 +-
 .../java/org/apache/inlong/manager/client/cli/LogCommand.java     | 8 ++++----
 2 files changed, 5 insertions(+), 5 deletions(-)

diff --git a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/ListCommand.java b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/ListCommand.java
index c313746c7..5db28cb72 100644
--- a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/ListCommand.java
+++ b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/ListCommand.java
@@ -19,7 +19,6 @@ package org.apache.inlong.manager.client.cli;
 
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.Parameters;
-import org.apache.inlong.manager.common.enums.SimpleGroupStatus;
 import org.apache.inlong.manager.client.api.inner.client.InlongGroupClient;
 import org.apache.inlong.manager.client.api.inner.client.InlongStreamClient;
 import org.apache.inlong.manager.client.api.inner.client.StreamSinkClient;
@@ -30,6 +29,7 @@ import org.apache.inlong.manager.client.cli.pojo.SourceInfo;
 import org.apache.inlong.manager.client.cli.pojo.StreamInfo;
 import org.apache.inlong.manager.client.cli.util.ClientUtils;
 import org.apache.inlong.manager.client.cli.util.PrintUtils;
+import org.apache.inlong.manager.common.enums.SimpleGroupStatus;
 import org.apache.inlong.manager.pojo.common.PageResult;
 import org.apache.inlong.manager.pojo.group.InlongGroupBriefInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupPageRequest;
diff --git a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/LogCommand.java b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/LogCommand.java
index dd6a8c15a..6d4fc164c 100644
--- a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/LogCommand.java
+++ b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/LogCommand.java
@@ -19,12 +19,12 @@ package org.apache.inlong.manager.client.cli;
 
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.Parameters;
-import com.github.pagehelper.PageInfo;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.client.api.inner.client.InlongGroupClient;
 import org.apache.inlong.manager.client.cli.pojo.GroupInfo;
 import org.apache.inlong.manager.client.cli.util.ClientUtils;
 import org.apache.inlong.manager.client.cli.util.PrintUtils;
+import org.apache.inlong.manager.pojo.common.PageResult;
 import org.apache.inlong.manager.pojo.group.InlongGroupBriefInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupPageRequest;
 
@@ -73,12 +73,12 @@ public class LogCommand extends AbstractCommand {
                 InlongGroupClient groupClient = ClientUtils.clientFactory.getGroupClient();
                 InlongGroupPageRequest pageRequest = new InlongGroupPageRequest();
                 pageRequest.setKeyword(inputs[1]);
-                PageInfo<InlongGroupBriefInfo> pageInfo = groupClient.listGroups(pageRequest);
-                if (pageInfo.getSize() > MAX_LOG_SIZE) {
+                PageResult<InlongGroupBriefInfo> pageResult = groupClient.listGroups(pageRequest);
+                if (pageResult.getPageSize() > MAX_LOG_SIZE) {
                     System.err.println("the log is too large to print, please change the filter condition");
                     return;
                 }
-                PrintUtils.print(pageInfo.getList(), GroupInfo.class);
+                PrintUtils.print(pageResult.getList(), GroupInfo.class);
             } catch (Exception e) {
                 e.printStackTrace();
             }


[inlong] 03/08: [INLONG-5698][Manager] Fixed the DataProxy cluster tag was restored after the manager was restarted (#5699)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 24bb64664a1eecc98a292f148797c13a69e74717
Author: fuweng11 <76...@users.noreply.github.com>
AuthorDate: Thu Aug 25 17:38:15 2022 +0800

    [INLONG-5698][Manager] Fixed the DataProxy cluster tag was restored after the manager was restarted (#5699)
---
 .../manager/dao/mapper/InlongClusterEntityMapper.java     |  2 ++
 .../main/resources/mappers/InlongClusterEntityMapper.xml  | 15 +++++++++++++++
 .../manager/service/core/heartbeat/HeartbeatManager.java  | 11 ++++-------
 3 files changed, 21 insertions(+), 7 deletions(-)

diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterEntityMapper.java
index d1c7c40ed..36ca245df 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterEntityMapper.java
@@ -40,6 +40,8 @@ public interface InlongClusterEntityMapper {
     List<InlongClusterEntity> selectByKey(@Param("clusterTag") String clusterTag, @Param("name") String name,
             @Param("type") String type);
 
+    InlongClusterEntity selectByNameAndType(@Param("name") String name, @Param("type") String type);
+
     List<InlongClusterEntity> selectByCondition(ClusterPageRequest request);
 
     /**
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterEntityMapper.xml
index 1df48fbcb..ca518dad6 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterEntityMapper.xml
@@ -105,6 +105,21 @@
         </where>
         order by modify_time desc
     </select>
+    <select id="selectByNameAndType" resultType="org.apache.inlong.manager.dao.entity.InlongClusterEntity">
+        select
+        <include refid="Base_Column_List"/>
+        from inlong_cluster
+        <where>
+            is_deleted = 0
+            <if test="type != null and type != ''">
+                and type = #{type, jdbcType=VARCHAR}
+            </if>
+            <if test="name != null and name != ''">
+                and name = #{name, jdbcType=VARCHAR}
+            </if>
+        </where>
+        order by modify_time desc
+    </select>
     <select id="selectByCondition"
             parameterType="org.apache.inlong.manager.pojo.cluster.ClusterPageRequest"
             resultType="org.apache.inlong.manager.dao.entity.InlongClusterEntity">
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManager.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManager.java
index 8353842ae..6027fa908 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManager.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManager.java
@@ -24,7 +24,6 @@ import com.github.benmanes.caffeine.cache.RemovalCause;
 import com.github.benmanes.caffeine.cache.Scheduler;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.common.heartbeat.AbstractHeartbeatManager;
 import org.apache.inlong.common.heartbeat.ComponentHeartbeat;
@@ -44,7 +43,6 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.PostConstruct;
-import java.util.List;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
@@ -158,12 +156,11 @@ public class HeartbeatManager implements AbstractHeartbeatManager {
         final String clusterName = componentHeartbeat.getClusterName();
         final String type = componentHeartbeat.getComponentType();
         final String clusterTag = componentHeartbeat.getClusterTag();
-        List<InlongClusterEntity> entities = clusterMapper.selectByKey(clusterTag, clusterName, type);
-        if (CollectionUtils.isNotEmpty(entities)) {
+        InlongClusterEntity entity = clusterMapper.selectByNameAndType(clusterName, type);
+        if (null != entity) {
             // TODO Load balancing needs to be considered.
-            InlongClusterEntity cluster = entities.get(0);
-            InlongClusterOperator operator = clusterOperatorFactory.getInstance(cluster.getType());
-            return operator.getFromEntity(cluster);
+            InlongClusterOperator operator = clusterOperatorFactory.getInstance(entity.getType());
+            return operator.getFromEntity(entity);
         }
 
         InlongClusterEntity cluster = new InlongClusterEntity();


[inlong] 02/08: [INLONG-5696][Dashboard] EditableTable value update error when columns change (#5697)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit e625854f186939127fbf15b8b0a49ac78b4dd84a
Author: Daniel <le...@apache.org>
AuthorDate: Thu Aug 25 17:29:49 2022 +0800

    [INLONG-5696][Dashboard] EditableTable value update error when columns change (#5697)
---
 inlong-dashboard/src/components/EditableTable/index.tsx | 10 ++--------
 1 file changed, 2 insertions(+), 8 deletions(-)

diff --git a/inlong-dashboard/src/components/EditableTable/index.tsx b/inlong-dashboard/src/components/EditableTable/index.tsx
index 84b432160..ee4cc5885 100644
--- a/inlong-dashboard/src/components/EditableTable/index.tsx
+++ b/inlong-dashboard/src/components/EditableTable/index.tsx
@@ -73,7 +73,7 @@ const getRowInitialValue = (columns: EditableTableProps['columns']) =>
       [cur.dataIndex]: cur.initialValue,
     }),
     {
-      _etid: Math.random().toString(),
+      _etid: `_etnew_${Math.random().toString()}`, // The tag of new.
     },
   );
 
@@ -104,13 +104,7 @@ const Comp = ({
   const { t } = useTranslation();
 
   const [data, setData] = useState<RecordType[]>(
-    addIdToValues(value) ||
-      (required
-        ? [getRowInitialValue(columns)].map(item => ({
-            ...item,
-            _etid: `_etnew_${item._etid}`, // The tag of new.
-          }))
-        : []),
+    addIdToValues(value) || (required ? [getRowInitialValue(columns)] : []),
   );
 
   const [colsSet, setColsSet] = useState(new Set(columns.map(item => item.dataIndex)));


[inlong] 07/08: [INLONG-5703][Manager] Add separator-related fields for some sources (#5706)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 9b9c3b34b01c70977bcf85ec4da58361218deda5
Author: emhui <11...@users.noreply.github.com>
AuthorDate: Fri Aug 26 12:52:12 2022 +0800

    [INLONG-5703][Manager] Add separator-related fields for some sources (#5706)
---
 .../apache/inlong/manager/common/consts/InlongConstants.java  |  5 -----
 .../inlong/manager/pojo/sort/util/ExtractNodeUtils.java       |  8 ++++----
 .../inlong/manager/pojo/source/autopush/AutoPushSource.java   |  9 +++++++++
 .../manager/pojo/source/autopush/AutoPushSourceDTO.java       |  9 +++++++++
 .../manager/pojo/source/autopush/AutoPushSourceRequest.java   | 11 +++++++++++
 .../apache/inlong/manager/pojo/source/kafka/KafkaSource.java  |  9 +++++++++
 .../inlong/manager/pojo/source/kafka/KafkaSourceDTO.java      |  9 +++++++++
 .../inlong/manager/pojo/source/kafka/KafkaSourceRequest.java  | 11 +++++++++++
 .../inlong/manager/pojo/source/pulsar/PulsarSource.java       |  9 +++++++++
 .../inlong/manager/pojo/source/pulsar/PulsarSourceDTO.java    |  9 +++++++++
 .../manager/pojo/source/pulsar/PulsarSourceRequest.java       | 11 +++++++++++
 .../manager/service/source/pulsar/PulsarSourceOperator.java   |  7 +++++--
 12 files changed, 96 insertions(+), 11 deletions(-)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
index 5dc922009..28c2435d2 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
@@ -110,9 +110,4 @@ public class InlongConstants {
 
     public static final String SORT_PROPERTIES = "sort.properties";
 
-    /**
-     * common config
-     */
-    public static final String FIELD_DELIMITER = "fieldDelimiter";
-
 }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
index 626db78c6..c94f84af4 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
@@ -23,8 +23,8 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.common.enums.DataTypeEnum;
-import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.enums.DataSeparator;
 import org.apache.inlong.manager.pojo.source.StreamSource;
 import org.apache.inlong.manager.pojo.source.kafka.KafkaOffset;
 import org.apache.inlong.manager.pojo.source.kafka.KafkaSource;
@@ -232,9 +232,9 @@ public class ExtractNodeUtils {
         DataTypeEnum dataType = DataTypeEnum.forName(pulsarSource.getSerializationType());
         switch (dataType) {
             case CSV:
-                String fieldDelimiter = (String) pulsarSource.getProperties()
-                        .get(InlongConstants.FIELD_DELIMITER);
-                format = StringUtils.isBlank(fieldDelimiter) ? new CsvFormat() : new CsvFormat(fieldDelimiter);
+                String separator = DataSeparator
+                        .forAscii(Integer.parseInt(pulsarSource.getDataSeparator())).getSeparator();
+                format = new CsvFormat(separator);
                 break;
             case AVRO:
                 format = new AvroFormat();
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/autopush/AutoPushSource.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/autopush/AutoPushSource.java
index d67a92ee1..ebd23d5fc 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/autopush/AutoPushSource.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/autopush/AutoPushSource.java
@@ -45,6 +45,15 @@ public class AutoPushSource extends StreamSource {
     @ApiModelProperty(value = "DataProxy group name, used when the user enables local configuration")
     private String dataProxyGroup;
 
+    @ApiModelProperty(value = "Data encoding format: UTF-8, GBK")
+    private String dataEncoding;
+
+    @ApiModelProperty(value = "Data separator, stored as ASCII code")
+    private String dataSeparator;
+
+    @ApiModelProperty(value = "Data field escape symbol, stored as ASCII code")
+    private String dataEscapeChar;
+
     public AutoPushSource() {
         this.setSourceType(SourceType.AUTO_PUSH);
     }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/autopush/AutoPushSourceDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/autopush/AutoPushSourceDTO.java
index 1077230f8..baa0c872c 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/autopush/AutoPushSourceDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/autopush/AutoPushSourceDTO.java
@@ -43,6 +43,15 @@ public class AutoPushSourceDTO {
     @ApiModelProperty(value = "DataProxy group name, used when the user enables local configuration")
     private String dataProxyGroup;
 
+    @ApiModelProperty(value = "Data encoding format: UTF-8, GBK")
+    private String dataEncoding;
+
+    @ApiModelProperty(value = "Data separator, stored as ASCII code")
+    private String dataSeparator;
+
+    @ApiModelProperty(value = "Data field escape symbol, stored as ASCII code")
+    private String dataEscapeChar;
+
     public static AutoPushSourceDTO getFromRequest(AutoPushSourceRequest request) {
         return AutoPushSourceDTO.builder()
                 .dataProxyGroup(request.getDataProxyGroup())
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/autopush/AutoPushSourceRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/autopush/AutoPushSourceRequest.java
index c8b8010d2..6ee58efd0 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/autopush/AutoPushSourceRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/autopush/AutoPushSourceRequest.java
@@ -19,10 +19,12 @@ package org.apache.inlong.manager.pojo.source.autopush;
 
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
+import java.nio.charset.StandardCharsets;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
 import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.enums.DataSeparator;
 import org.apache.inlong.manager.common.util.JsonTypeDefine;
 import org.apache.inlong.manager.pojo.source.SourceRequest;
 
@@ -39,6 +41,15 @@ public class AutoPushSourceRequest extends SourceRequest {
     @ApiModelProperty(value = "DataProxy group name, used when the user enables local configuration")
     private String dataProxyGroup;
 
+    @ApiModelProperty(value = "Data encoding format: UTF-8, GBK")
+    private String dataEncoding = StandardCharsets.UTF_8.toString();
+
+    @ApiModelProperty(value = "Data separator, stored as ASCII code")
+    private String dataSeparator = DataSeparator.VERTICAL_BAR.getAsciiCode().toString();
+
+    @ApiModelProperty(value = "Data field escape symbol, stored as ASCII code")
+    private String dataEscapeChar;
+
     public AutoPushSourceRequest() {
         this.setSourceType(SourceType.AUTO_PUSH);
     }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSource.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSource.java
index 2c9ca8004..74d480b35 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSource.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSource.java
@@ -81,6 +81,15 @@ public class KafkaSource extends StreamSource {
     @ApiModelProperty("Primary key, needed when serialization type is csv, json, avro")
     private String primaryKey;
 
+    @ApiModelProperty(value = "Data encoding format: UTF-8, GBK")
+    private String dataEncoding;
+
+    @ApiModelProperty(value = "Data separator, stored as ASCII code")
+    private String dataSeparator;
+
+    @ApiModelProperty(value = "Data field escape symbol, stored as ASCII code")
+    private String dataEscapeChar;
+
     public KafkaSource() {
         this.setSourceType(SourceType.KAFKA);
     }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSourceDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSourceDTO.java
index 70d72cccc..e81c373a7 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSourceDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSourceDTO.java
@@ -90,6 +90,15 @@ public class KafkaSourceDTO {
     @ApiModelProperty("Field needed when serializationType is csv,json,avro")
     private String primaryKey;
 
+    @ApiModelProperty(value = "Data encoding format: UTF-8, GBK")
+    private String dataEncoding;
+
+    @ApiModelProperty(value = "Data separator, stored as ASCII code")
+    private String dataSeparator;
+
+    @ApiModelProperty(value = "Data field escape symbol, stored as ASCII code")
+    private String dataEscapeChar;
+
     @ApiModelProperty("Properties for Kafka")
     private Map<String, Object> properties;
 
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSourceRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSourceRequest.java
index 2934fd83a..120567218 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSourceRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSourceRequest.java
@@ -19,10 +19,12 @@ package org.apache.inlong.manager.pojo.source.kafka;
 
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
+import java.nio.charset.StandardCharsets;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
 import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.enums.DataSeparator;
 import org.apache.inlong.manager.common.util.JsonTypeDefine;
 import org.apache.inlong.manager.pojo.source.SourceRequest;
 
@@ -77,6 +79,15 @@ public class KafkaSourceRequest extends SourceRequest {
     @ApiModelProperty("Primary key, needed when serialization type is csv, json, avro")
     private String primaryKey;
 
+    @ApiModelProperty(value = "Data encoding format: UTF-8, GBK")
+    private String dataEncoding = StandardCharsets.UTF_8.toString();
+
+    @ApiModelProperty(value = "Data separator, stored as ASCII code")
+    private String dataSeparator = DataSeparator.VERTICAL_BAR.getAsciiCode().toString();
+
+    @ApiModelProperty(value = "Data field escape symbol, stored as ASCII code")
+    private String dataEscapeChar;
+
     public KafkaSourceRequest() {
         this.setSourceType(SourceType.KAFKA);
     }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSource.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSource.java
index 763aefb24..e1be7d1cb 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSource.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSource.java
@@ -62,6 +62,15 @@ public class PulsarSource extends StreamSource {
     @ApiModelProperty("Primary key, needed when serialization type is csv, json, avro")
     private String primaryKey;
 
+    @ApiModelProperty(value = "Data encoding format: UTF-8, GBK")
+    private String dataEncoding;
+
+    @ApiModelProperty(value = "Data separator, stored as ASCII code")
+    private String dataSeparator;
+
+    @ApiModelProperty(value = "Data field escape symbol, stored as ASCII code")
+    private String dataEscapeChar;
+
     @ApiModelProperty("Configure the Source's startup mode. "
             + "Available options are earliest, latest, external-subscription, and specific-offsets.")
     @Builder.Default
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceDTO.java
index 602b6f74b..17c540a88 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceDTO.java
@@ -59,6 +59,15 @@ public class PulsarSourceDTO {
     @ApiModelProperty("Primary key, needed when serialization type is csv, json, avro")
     private String primaryKey;
 
+    @ApiModelProperty(value = "Data encoding format: UTF-8, GBK")
+    private String dataEncoding;
+
+    @ApiModelProperty(value = "Data separator, stored as ASCII code")
+    private String dataSeparator;
+
+    @ApiModelProperty(value = "Data field escape symbol, stored as ASCII code")
+    private String dataEscapeChar;
+
     @ApiModelProperty("Configure the Source's startup mode. "
             + "Available options are earliest, latest, external-subscription, and specific-offsets.")
     @Builder.Default
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceRequest.java
index 497ea2240..e883e1caa 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceRequest.java
@@ -19,10 +19,12 @@ package org.apache.inlong.manager.pojo.source.pulsar;
 
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
+import java.nio.charset.StandardCharsets;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
 import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.enums.DataSeparator;
 import org.apache.inlong.manager.common.util.JsonTypeDefine;
 import org.apache.inlong.manager.pojo.source.SourceRequest;
 
@@ -54,6 +56,15 @@ public class PulsarSourceRequest extends SourceRequest {
     @ApiModelProperty("Primary key, needed when serialization type is csv, json, avro")
     private String primaryKey;
 
+    @ApiModelProperty(value = "Data encoding format: UTF-8, GBK")
+    private String dataEncoding = StandardCharsets.UTF_8.toString();
+
+    @ApiModelProperty(value = "Data separator, stored as ASCII code")
+    private String dataSeparator = DataSeparator.VERTICAL_BAR.getAsciiCode().toString();
+
+    @ApiModelProperty(value = "Data field escape symbol, stored as ASCII code")
+    private String dataEscapeChar;
+
     @ApiModelProperty("Configure the Source's startup mode."
             + " Available options are earliest, latest, external-subscription, and specific-offsets.")
     private String scanStartupMode = "earliest";
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
index 1bc490fda..0b079c269 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
@@ -26,6 +26,7 @@ import org.apache.inlong.common.enums.DataTypeEnum;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.consts.SourceType;
 import org.apache.inlong.manager.common.enums.ClusterType;
+import org.apache.inlong.manager.common.enums.DataSeparator;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
@@ -152,8 +153,10 @@ public class PulsarSourceOperator extends AbstractSourceOperator {
                 pulsarSource.setSerializationType(DataTypeEnum.CSV.getName());
             }
             if (DataTypeEnum.CSV.getName().equalsIgnoreCase(pulsarSource.getSerializationType())) {
-                Map<String, Object> properties = pulsarSource.getProperties();
-                properties.put(InlongConstants.FIELD_DELIMITER, streamInfo.getDataSeparator());
+                pulsarSource.setDataSeparator(streamInfo.getDataSeparator());
+                if (StringUtils.isEmpty(pulsarSource.getDataSeparator())) {
+                    pulsarSource.setDataSeparator(DataSeparator.COMMA.getAsciiCode().toString());
+                }
             }
             pulsarSource.setScanStartupMode(PulsarScanStartupMode.EARLIEST.getValue());
             pulsarSource.setFieldList(streamInfo.getFieldList());


[inlong] 06/08: [INLONG-5694][Manager] Fix the problem that gets the inlong group error (#5695)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit b6151c6827ec0198b8bcf97dfc93e2d0cccbda4a
Author: fuweng11 <76...@users.noreply.github.com>
AuthorDate: Fri Aug 26 12:51:20 2022 +0800

    [INLONG-5694][Manager] Fix the problem that gets the inlong group error (#5695)
---
 .../manager/plugin/listener/DeleteSortListener.java     | 12 ++++++------
 .../manager/plugin/listener/DeleteStreamListener.java   | 12 ++++++------
 .../manager/plugin/listener/RestartSortListener.java    | 14 +++++++-------
 .../manager/plugin/listener/RestartStreamListener.java  | 14 +++++++-------
 .../manager/plugin/listener/StartupStreamListener.java  | 14 +++++++-------
 .../manager/plugin/listener/SuspendSortListener.java    | 12 ++++++------
 .../manager/plugin/listener/SuspendStreamListener.java  | 12 ++++++------
 .../service/core/impl/ConsumptionServiceImpl.java       |  8 ++++----
 .../service/core/impl/SortSourceServiceImpl.java        | 17 +++++++++--------
 .../manager/service/group/InlongGroupServiceImpl.java   |  5 +++--
 .../workflow/core/impl/WorkflowQueryServiceImpl.java    |  5 +++--
 11 files changed, 64 insertions(+), 61 deletions(-)

diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
index f2dd23537..227674f6e 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
@@ -23,21 +23,21 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
+import org.apache.inlong.manager.plugin.flink.FlinkOperation;
+import org.apache.inlong.manager.plugin.flink.FlinkService;
+import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
 import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
-import org.apache.inlong.manager.plugin.flink.FlinkOperation;
-import org.apache.inlong.manager.plugin.flink.FlinkService;
-import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
 import org.apache.inlong.manager.workflow.event.task.TaskEvent;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;
 
@@ -88,8 +88,8 @@ public class DeleteSortListener implements SortOperateListener {
         List<InlongGroupExtInfo> extList = inlongGroupInfo.getExtList();
         log.info("inlong group ext info: {}", extList);
 
-        Map<String, String> kvConf = extList.stream().collect(
-                Collectors.toMap(InlongGroupExtInfo::getKeyName, InlongGroupExtInfo::getKeyValue));
+        Map<String, String> kvConf = new HashMap<>();
+        extList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue()));
         String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
         if (StringUtils.isEmpty(sortExt)) {
             String message = String.format("delete sort failed for groupId [%s], as the sort properties is empty",
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
index d54c6aa56..96c856781 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
@@ -23,23 +23,23 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
+import org.apache.inlong.manager.plugin.flink.FlinkOperation;
+import org.apache.inlong.manager.plugin.flink.FlinkService;
+import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
 import org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
-import org.apache.inlong.manager.plugin.flink.FlinkOperation;
-import org.apache.inlong.manager.plugin.flink.FlinkService;
-import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
 import org.apache.inlong.manager.workflow.event.task.TaskEvent;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;
 
@@ -89,8 +89,8 @@ public class DeleteStreamListener implements SortOperateListener {
         log.info("inlong stream :{} ext info: {}", streamInfo.getInlongStreamId(), streamExtList);
         final String groupId = streamInfo.getInlongGroupId();
         final String streamId = streamInfo.getInlongStreamId();
-        Map<String, String> kvConf = groupExtList.stream().collect(
-                Collectors.toMap(InlongGroupExtInfo::getKeyName, InlongGroupExtInfo::getKeyValue));
+        Map<String, String> kvConf = new HashMap<>();
+        groupExtList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue()));
         streamExtList.forEach(extInfo -> {
             kvConf.put(extInfo.getKeyName(), extInfo.getKeyValue());
         });
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
index fc4ef876c..2b2218b52 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
@@ -23,22 +23,22 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
-import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
-import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
-import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
 import org.apache.inlong.manager.plugin.flink.FlinkOperation;
 import org.apache.inlong.manager.plugin.flink.FlinkService;
 import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
 import org.apache.inlong.manager.plugin.flink.enums.Constants;
+import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
+import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
+import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
 import org.apache.inlong.manager.workflow.event.task.TaskEvent;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;
 
@@ -89,8 +89,8 @@ public class RestartSortListener implements SortOperateListener {
         List<InlongGroupExtInfo> extList = inlongGroupInfo.getExtList();
         log.info("inlong group ext info: {}", extList);
 
-        Map<String, String> kvConf = extList.stream().collect(
-                Collectors.toMap(InlongGroupExtInfo::getKeyName, InlongGroupExtInfo::getKeyValue));
+        Map<String, String> kvConf = new HashMap<>();
+        extList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue()));
         String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
         if (StringUtils.isEmpty(sortExt)) {
             String message = String.format("restart sort failed for groupId [%s], as the sort properties is empty",
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
index cdf570e5e..ab94de669 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
@@ -23,24 +23,24 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
+import org.apache.inlong.manager.plugin.flink.FlinkOperation;
+import org.apache.inlong.manager.plugin.flink.FlinkService;
+import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
+import org.apache.inlong.manager.plugin.flink.enums.Constants;
 import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
 import org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
-import org.apache.inlong.manager.plugin.flink.FlinkOperation;
-import org.apache.inlong.manager.plugin.flink.FlinkService;
-import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
-import org.apache.inlong.manager.plugin.flink.enums.Constants;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
 import org.apache.inlong.manager.workflow.event.task.TaskEvent;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;
 
@@ -90,8 +90,8 @@ public class RestartStreamListener implements SortOperateListener {
         log.info("inlong stream :{} ext info: {}", streamInfo.getInlongStreamId(), streamExtList);
         final String groupId = streamInfo.getInlongGroupId();
         final String streamId = streamInfo.getInlongStreamId();
-        Map<String, String> kvConf = groupExtList.stream().collect(
-                Collectors.toMap(InlongGroupExtInfo::getKeyName, InlongGroupExtInfo::getKeyValue));
+        Map<String, String> kvConf = new HashMap<>();
+        groupExtList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue()));
         streamExtList.stream().forEach(extInfo -> {
             kvConf.put(extInfo.getKeyName(), extInfo.getKeyValue());
         });
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java
index 9c37449f2..1ee9deffb 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java
@@ -24,24 +24,24 @@ import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
+import org.apache.inlong.manager.plugin.flink.FlinkOperation;
+import org.apache.inlong.manager.plugin.flink.FlinkService;
+import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
+import org.apache.inlong.manager.plugin.flink.enums.Constants;
 import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
 import org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
-import org.apache.inlong.manager.plugin.flink.FlinkOperation;
-import org.apache.inlong.manager.plugin.flink.FlinkService;
-import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
-import org.apache.inlong.manager.plugin.flink.enums.Constants;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
 import org.apache.inlong.manager.workflow.event.task.TaskEvent;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;
 
@@ -97,8 +97,8 @@ public class StartupStreamListener implements SortOperateListener {
             return ListenerResult.success();
         }
 
-        Map<String, String> kvConf = groupExtList.stream().collect(
-                Collectors.toMap(InlongGroupExtInfo::getKeyName, InlongGroupExtInfo::getKeyValue));
+        Map<String, String> kvConf = new HashMap<>();
+        groupExtList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue()));
         streamExtList.forEach(extInfo -> {
             kvConf.put(extInfo.getKeyName(), extInfo.getKeyValue());
         });
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
index 88428c118..59eb35e32 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
@@ -23,21 +23,21 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
+import org.apache.inlong.manager.plugin.flink.FlinkOperation;
+import org.apache.inlong.manager.plugin.flink.FlinkService;
+import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
 import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
-import org.apache.inlong.manager.plugin.flink.FlinkOperation;
-import org.apache.inlong.manager.plugin.flink.FlinkService;
-import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
 import org.apache.inlong.manager.workflow.event.task.TaskEvent;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;
 
@@ -88,8 +88,8 @@ public class SuspendSortListener implements SortOperateListener {
         List<InlongGroupExtInfo> extList = inlongGroupInfo.getExtList();
         log.info("inlong group ext info: {}", extList);
 
-        Map<String, String> kvConf = extList.stream().collect(
-                Collectors.toMap(InlongGroupExtInfo::getKeyName, InlongGroupExtInfo::getKeyValue));
+        Map<String, String> kvConf = new HashMap<>();
+        extList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue()));
         String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
         if (StringUtils.isEmpty(sortExt)) {
             String message = String.format("suspend sort failed for groupId [%s], as the sort properties is empty",
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java
index c3b2e21ad..f1e13ddf6 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java
@@ -23,23 +23,23 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
+import org.apache.inlong.manager.plugin.flink.FlinkOperation;
+import org.apache.inlong.manager.plugin.flink.FlinkService;
+import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
 import org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
-import org.apache.inlong.manager.plugin.flink.FlinkOperation;
-import org.apache.inlong.manager.plugin.flink.FlinkService;
-import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
 import org.apache.inlong.manager.workflow.event.task.TaskEvent;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;
 
@@ -89,8 +89,8 @@ public class SuspendStreamListener implements SortOperateListener {
         log.info("inlong stream :{} ext info: {}", streamInfo.getInlongStreamId(), streamExtList);
         final String groupId = streamInfo.getInlongGroupId();
         final String streamId = streamInfo.getInlongStreamId();
-        Map<String, String> kvConf = groupExtList.stream().collect(
-                Collectors.toMap(InlongGroupExtInfo::getKeyName, InlongGroupExtInfo::getKeyValue));
+        Map<String, String> kvConf = new HashMap<>();
+        groupExtList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue()));
         streamExtList.stream().forEach(extInfo -> {
             kvConf.put(extInfo.getKeyName(), extInfo.getKeyValue());
         });
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
index e7a1c9e12..8a899a390 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
@@ -36,7 +36,6 @@ import org.apache.inlong.manager.dao.mapper.ConsumptionEntityMapper;
 import org.apache.inlong.manager.dao.mapper.ConsumptionPulsarEntityMapper;
 import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
 import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
-import org.apache.inlong.manager.pojo.common.CountInfo;
 import org.apache.inlong.manager.pojo.common.PageResult;
 import org.apache.inlong.manager.pojo.consumption.ConsumptionInfo;
 import org.apache.inlong.manager.pojo.consumption.ConsumptionListVo;
@@ -62,6 +61,7 @@ import org.springframework.util.CollectionUtils;
 
 import java.util.Arrays;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
@@ -98,9 +98,9 @@ public class ConsumptionServiceImpl implements ConsumptionService {
 
     @Override
     public ConsumptionSummary getSummary(ConsumptionQuery query) {
-        Map<String, Integer> countMap = consumptionMapper.countByQuery(query)
-                .stream()
-                .collect(Collectors.toMap(CountInfo::getKey, CountInfo::getValue));
+        Map<String, Integer> countMap = new HashMap<>();
+        consumptionMapper.countByQuery(query)
+                .forEach(countInfo -> countMap.put(countInfo.getKey(), countInfo.getValue()));
 
         return ConsumptionSummary.builder()
                 .totalCount(countMap.values().stream().mapToInt(c -> c).sum())
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
index dc51b6e9f..0510ce2f1 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
@@ -42,6 +42,7 @@ import org.springframework.transaction.annotation.Transactional;
 import javax.annotation.PostConstruct;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -235,11 +236,11 @@ public class SortSourceServiceImpl implements SortSourceService {
 
             task2Group.forEach((task, groupList) -> {
                 // get topic properties under this cluster and task, group them by group id.
-                Map<String, Map<String, String>> group2topicProp = allStreamInfos.stream()
-                        .filter(stream -> stream.getSortTaskName().equals(task)
-                                && stream.getSortClusterName().equals(clusterName))
-                        .collect(Collectors.toMap(SortSourceStreamInfo::getGroupId,
-                                SortSourceStreamInfo::getExtParamsMap));
+                Map<String, Map<String, String>> group2topicProp = new HashMap<>();
+                allStreamInfos.stream().filter(stream -> stream.getSortTaskName().equals(task)
+                        && stream.getSortClusterName().equals(clusterName)).forEach(
+                        sortSourceStreamInfo -> group2topicProp.put(sortSourceStreamInfo.getGroupId(),
+                                sortSourceStreamInfo.getExtParamsMap()));
 
                 Map<String, CacheZone> cacheZones;
                 try {
@@ -317,9 +318,9 @@ public class SortSourceServiceImpl implements SortSourceService {
         List<String> tags = new ArrayList<>(tag2GroupInfos.keySet());
 
         // Clusters that related to these tags
-        Map<String, List<SortSourceClusterInfo>> tag2ClusterInfos = allTag2ClusterInfos.entrySet().stream()
-                .filter(entry -> tag2GroupInfos.containsKey(entry.getKey()))
-                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+        Map<String, List<SortSourceClusterInfo>> tag2ClusterInfos = new HashMap<>();
+        allTag2ClusterInfos.entrySet().stream().filter(entry -> tag2GroupInfos.containsKey(entry.getKey()))
+                .forEach(entry -> tag2ClusterInfos.put(entry.getKey(), entry.getValue()));
 
         // get CacheZone list
         return tags.stream()
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
index 65aecd2e3..d9c5f1c63 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
@@ -70,6 +70,7 @@ import org.springframework.validation.annotation.Validated;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -422,8 +423,8 @@ public class InlongGroupServiceImpl implements InlongGroupService {
     }
 
     private BaseSortConf buildSortConfig(List<InlongGroupExtInfo> extInfos) {
-        Map<String, String> extMap = extInfos.stream()
-                .collect(Collectors.toMap(InlongGroupExtInfo::getKeyName, InlongGroupExtInfo::getKeyValue));
+        Map<String, String> extMap = new HashMap<>();
+        extInfos.forEach(extInfo -> extMap.put(extInfo.getKeyName(), extInfo.getKeyValue()));
         String type = extMap.get(InlongConstants.SORT_TYPE);
         if (StringUtils.isBlank(type)) {
             return null;
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/WorkflowQueryServiceImpl.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/WorkflowQueryServiceImpl.java
index b34bc8e83..5eac67e94 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/WorkflowQueryServiceImpl.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/WorkflowQueryServiceImpl.java
@@ -62,6 +62,7 @@ import org.springframework.stereotype.Service;
 
 import java.util.Arrays;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -118,8 +119,8 @@ public class WorkflowQueryServiceImpl implements WorkflowQueryService {
     public ProcessCountResponse countProcess(ProcessCountRequest request) {
         List<CountInfo> result = processEntityMapper.countByQuery(request);
 
-        Map<String, Integer> countByState = result.stream()
-                .collect(Collectors.toMap(CountInfo::getKey, CountInfo::getValue));
+        Map<String, Integer> countByState = new HashMap<>();
+        result.forEach(countInfo -> countByState.put(countInfo.getKey(), countInfo.getValue()));
 
         return ProcessCountResponse.builder()
                 .totalApplyCount(countByState.values().stream().mapToInt(c -> c).sum())


[inlong] 08/08: [INLONG-5700][TubeMQ] Core file generated while the C++ consumer is closed (#5707)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 555d2ddbeeb04759ccbf83e83de2beb66ab12bc6
Author: Goson Zhang <46...@qq.com>
AuthorDate: Fri Aug 26 12:52:52 2022 +0800

    [INLONG-5700][TubeMQ] Core file generated while the C++ consumer is closed (#5707)
    
    Co-authored-by: Charley <ch...@apache.org>
---
 .../tubemq-client-cpp/src/baseconsumer.cc          | 40 ++++++++++++++++++----
 1 file changed, 34 insertions(+), 6 deletions(-)

diff --git a/inlong-tubemq/tubemq-client-twins/tubemq-client-cpp/src/baseconsumer.cc b/inlong-tubemq/tubemq-client-twins/tubemq-client-cpp/src/baseconsumer.cc
index 25001de2b..c7152a16e 100644
--- a/inlong-tubemq/tubemq-client-twins/tubemq-client-cpp/src/baseconsumer.cc
+++ b/inlong-tubemq/tubemq-client-twins/tubemq-client-cpp/src/baseconsumer.cc
@@ -126,13 +126,32 @@ void BaseConsumer::ShutDown() {
   close2Master();
   // 3. close all brokers
   closeAllBrokers();
-  // 4. remove client stub
-  TubeMQService::Instance()->RmvClientObj(shared_from_this());
-  client_index_ = tb_config::kInvalidValue;
+  // 4. check master hb thread status
+  int check_count = 5;
+  while (master_hb_status_.Get() != 0) {
+    std::this_thread::sleep_for(std::chrono::milliseconds(30));
+    if (--check_count <= 0) {
+      LOG_INFO("[CONSUMER] Found hb status id not zero[%d], client=%s",
+               master_hb_status_.Get(), client_uuid_.c_str());  
+      break;
+    }
+  }
+  check_count = 5;
+  while (master_reg_status_.Get() != 0) {
+    std::this_thread::sleep_for(std::chrono::milliseconds(30));
+    if (--check_count <= 0) {
+      LOG_INFO("[CONSUMER] Found reg status id not zero[%d], client=%s",
+               master_reg_status_.Get(), client_uuid_.c_str());  
+      break;
+    }
+  }
   // 5. join hb thread;
   heart_beat_timer_ = nullptr;
   rebalance_thread_ptr_->join();
   rebalance_thread_ptr_ = nullptr;
+  // 6. remove client stub
+  TubeMQService::Instance()->RmvClientObj(shared_from_this());
+  client_index_ = tb_config::kInvalidValue;
   LOG_INFO("[CONSUMER] ShutDown consumer finished, client=%s", client_uuid_.c_str());
 }
 
@@ -520,8 +539,9 @@ void BaseConsumer::heartBeat2Master() {
   req_protocol->request_id_ = request->request_id_;
   req_protocol->rpc_read_timeout_ = config_.GetRpcReadTimeoutMs() - 500;
   // send message to target
+  auto self = shared_from_this();
   AsyncRequest(request, req_protocol)
-      .AddCallBack([=](ErrorCode error, const ResponseContext& response_context) {
+      .AddCallBack([&](ErrorCode error, const ResponseContext& response_context) {
         if (error.Value() != err_code::kErrSuccess) {
           master_sh_retry_cnt_++;
           LOG_WARN("[CONSUMER] heartBeat2Master failue to (%s:%d) : %s, client=%s",
@@ -559,7 +579,6 @@ void BaseConsumer::heartBeat2Master() {
           }
         }
         heart_beat_timer_->expires_after(std::chrono::milliseconds(nextHeartBeatPeriodms()));
-        auto self = shared_from_this();
         heart_beat_timer_->async_wait([self, this](const std::error_code& ec) {
           if (ec) {
             return;
@@ -665,6 +684,10 @@ void BaseConsumer::processConnect2Broker(ConsumerEvent& event) {
     rmtdata_cache_.FilterPartitions(subscribe_info, subscribed_partitions, unsub_partitions);
     if (!unsub_partitions.empty()) {
       for (it = unsub_partitions.begin(); it != unsub_partitions.end(); it++) {
+        if (!isClientRunning()) {
+          LOG_TRACE("[processConnect2Broker] client stopped, break pos1, clientid=%s", client_uuid_.c_str());
+          break;
+        }
         LOG_TRACE("[processConnect2Broker] connect to %s, clientid=%s",
                   it->GetPartitionKey().c_str(), client_uuid_.c_str());
         auto request = std::make_shared<RequestContext>();
@@ -681,6 +704,10 @@ void BaseConsumer::processConnect2Broker(ConsumerEvent& event) {
         // send message to target
         ResponseContext response_context;
         ErrorCode error = SyncRequest(response_context, request, req_protocol);
+        if (!isClientRunning()) {
+          LOG_TRACE("[processConnect2Broker] client stopped, break pos2, clientid=%s", client_uuid_.c_str());
+          break;
+        }
         if (error.Value() != err_code::kErrSuccess) {
           LOG_WARN("[Connect2Broker] request network failure to (%s:%d) : %s",
                    it->GetBrokerHost().c_str(), it->GetBrokerPort(), error.Message().c_str());
@@ -759,8 +786,9 @@ void BaseConsumer::processHeartBeat2Broker(NodeInfo broker_info) {
   req_protocol->request_id_ = request->request_id_;
   req_protocol->rpc_read_timeout_ = config_.GetRpcReadTimeoutMs() - 500;
   // send message to target
+  auto self = shared_from_this();
   AsyncRequest(request, req_protocol)
-      .AddCallBack([=](ErrorCode error, const ResponseContext& response_context) {
+      .AddCallBack([&](ErrorCode error, const ResponseContext& response_context) {
         if (error.Value() != err_code::kErrSuccess) {
           LOG_WARN("[Heartbeat2Broker] request network  to failure (%s), ression is %s",
                    broker_info.GetAddrInfo().c_str(), error.Message().c_str());