You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/09/08 02:39:18 UTC

[inlong] branch release-1.3.0 updated: [INLONG-5666][Sort] Supports init multiple Sort SDK to improve performance (#5684)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/release-1.3.0 by this push:
     new 7e0293147 [INLONG-5666][Sort] Supports init multiple Sort SDK to improve performance (#5684)
7e0293147 is described below

commit 7e0293147fd93da8a597b08df0db1ee84972207c
Author: vernedeng <de...@pku.edu.cn>
AuthorDate: Tue Sep 6 09:57:15 2022 +0800

    [INLONG-5666][Sort] Supports init multiple Sort SDK to improve performance (#5684)
---
 .../standalone/source/sortsdk/SortSdkSource.java   | 27 +++++++++++++++-------
 1 file changed, 19 insertions(+), 8 deletions(-)

diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
index c80fdbc9f..18cef6065 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
@@ -43,7 +43,9 @@ import org.slf4j.LoggerFactory;
 
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -81,6 +83,9 @@ public final class SortSdkSource extends AbstractSource
     // Default consume strategy of {@link SortClient}.
     private static final SortClientConfig.ConsumeStrategy defaultStrategy = SortClientConfig.ConsumeStrategy.lastest;
 
+    private static final String KEY_SORT_SDK_CLIENT_NUM = "sortSdkClientNum";
+    private static final int DEFAULT_SORT_SDK_CLIENT_NUM = 1;
+
     private String taskName;
 
     // Context of SortSdkSource.
@@ -96,7 +101,7 @@ public final class SortSdkSource extends AbstractSource
     private ScheduledExecutorService pool;
 
     // {@link SortClient}.
-    private SortClient sortClient;
+    private List<SortClient> sortClients = new ArrayList<>();
 
     /**
      * Start SortSdkSource.
@@ -104,7 +109,10 @@ public final class SortSdkSource extends AbstractSource
     @Override
     public synchronized void start() {
         LOG.info("start to SortSdkSource:{}", taskName);
-        this.sortClient = this.newClient(taskName);
+        int sortSdkClientNum = CommonPropertiesHolder.getInteger(KEY_SORT_SDK_CLIENT_NUM, DEFAULT_SORT_SDK_CLIENT_NUM);
+        for (int i = 0; i < sortSdkClientNum; i++) {
+            this.sortClients.add(this.newClient(taskName));
+        }
     }
 
     /**
@@ -114,7 +122,7 @@ public final class SortSdkSource extends AbstractSource
     public void stop() {
         pool.shutdownNow();
         LOG.info("Close sort client {}.", taskName);
-        if (sortClient != null) {
+        for (SortClient sortClient : sortClients) {
             sortClient.getConfig().setStopConsume(true);
             sortClient.close();
         }
@@ -126,7 +134,7 @@ public final class SortSdkSource extends AbstractSource
     @Override
     public void run() {
         LOG.info("start to reload SortSdkSource:{}", taskName);
-        if (sortClient != null) {
+        for (SortClient sortClient : sortClients) {
             sortClient.getConfig().setManagerApiUrl(ManagerUrlHandler.getSortSourceConfigUrl());
         }
     }
@@ -230,9 +238,8 @@ public final class SortSdkSource extends AbstractSource
      * @return Map
      */
     private Map<String, String> getSortClientConfigParameters() {
-        Map<String, String> sortSdkParams = new HashMap<>();
         Map<String, String> commonParams = CommonPropertiesHolder.getContext().getSubProperties(SORT_SDK_PREFIX);
-        sortSdkParams.putAll(commonParams);
+        Map<String, String> sortSdkParams = new HashMap<>(commonParams);
         SortTaskConfig taskConfig = SortClusterConfigHolder.getTaskConfig(taskName);
         if (taskConfig != null) {
             Map<String, String> sinkParams = taskConfig.getSinkParams();
@@ -249,7 +256,9 @@ public final class SortSdkSource extends AbstractSource
      */
     @Override
     public void stopConsumer() {
-        sortClient.getConfig().setStopConsume(true);
+        for (SortClient sortClient : sortClients) {
+            sortClient.getConfig().setStopConsume(true);
+        }
     }
 
     /**
@@ -257,6 +266,8 @@ public final class SortSdkSource extends AbstractSource
      */
     @Override
     public void recoverConsumer() {
-        sortClient.getConfig().setStopConsume(false);
+        for (SortClient sortClient : sortClients) {
+            sortClient.getConfig().setStopConsume(false);
+        }
     }
 }