You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/09/08 02:39:18 UTC
[inlong] branch release-1.3.0 updated: [INLONG-5666][Sort] Supports init multiple Sort SDK to improve performance (#5684)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/release-1.3.0 by this push:
new 7e0293147 [INLONG-5666][Sort] Supports init multiple Sort SDK to improve performance (#5684)
7e0293147 is described below
commit 7e0293147fd93da8a597b08df0db1ee84972207c
Author: vernedeng <de...@pku.edu.cn>
AuthorDate: Tue Sep 6 09:57:15 2022 +0800
[INLONG-5666][Sort] Supports init multiple Sort SDK to improve performance (#5684)
---
.../standalone/source/sortsdk/SortSdkSource.java | 27 +++++++++++++++-------
1 file changed, 19 insertions(+), 8 deletions(-)
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
index c80fdbc9f..18cef6065 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
@@ -43,7 +43,9 @@ import org.slf4j.LoggerFactory;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -81,6 +83,9 @@ public final class SortSdkSource extends AbstractSource
// Default consume strategy of {@link SortClient}.
private static final SortClientConfig.ConsumeStrategy defaultStrategy = SortClientConfig.ConsumeStrategy.lastest;
+ private static final String KEY_SORT_SDK_CLIENT_NUM = "sortSdkClientNum";
+ private static final int DEFAULT_SORT_SDK_CLIENT_NUM = 1;
+
private String taskName;
// Context of SortSdkSource.
@@ -96,7 +101,7 @@ public final class SortSdkSource extends AbstractSource
private ScheduledExecutorService pool;
// {@link SortClient}.
- private SortClient sortClient;
+ private List<SortClient> sortClients = new ArrayList<>();
/**
* Start SortSdkSource.
@@ -104,7 +109,10 @@ public final class SortSdkSource extends AbstractSource
@Override
public synchronized void start() {
LOG.info("start to SortSdkSource:{}", taskName);
- this.sortClient = this.newClient(taskName);
+ int sortSdkClientNum = CommonPropertiesHolder.getInteger(KEY_SORT_SDK_CLIENT_NUM, DEFAULT_SORT_SDK_CLIENT_NUM);
+ for (int i = 0; i < sortSdkClientNum; i++) {
+ this.sortClients.add(this.newClient(taskName));
+ }
}
/**
@@ -114,7 +122,7 @@ public final class SortSdkSource extends AbstractSource
public void stop() {
pool.shutdownNow();
LOG.info("Close sort client {}.", taskName);
- if (sortClient != null) {
+ for (SortClient sortClient : sortClients) {
sortClient.getConfig().setStopConsume(true);
sortClient.close();
}
@@ -126,7 +134,7 @@ public final class SortSdkSource extends AbstractSource
@Override
public void run() {
LOG.info("start to reload SortSdkSource:{}", taskName);
- if (sortClient != null) {
+ for (SortClient sortClient : sortClients) {
sortClient.getConfig().setManagerApiUrl(ManagerUrlHandler.getSortSourceConfigUrl());
}
}
@@ -230,9 +238,8 @@ public final class SortSdkSource extends AbstractSource
* @return Map
*/
private Map<String, String> getSortClientConfigParameters() {
- Map<String, String> sortSdkParams = new HashMap<>();
Map<String, String> commonParams = CommonPropertiesHolder.getContext().getSubProperties(SORT_SDK_PREFIX);
- sortSdkParams.putAll(commonParams);
+ Map<String, String> sortSdkParams = new HashMap<>(commonParams);
SortTaskConfig taskConfig = SortClusterConfigHolder.getTaskConfig(taskName);
if (taskConfig != null) {
Map<String, String> sinkParams = taskConfig.getSinkParams();
@@ -249,7 +256,9 @@ public final class SortSdkSource extends AbstractSource
*/
@Override
public void stopConsumer() {
- sortClient.getConfig().setStopConsume(true);
+ for (SortClient sortClient : sortClients) {
+ sortClient.getConfig().setStopConsume(true);
+ }
}
/**
@@ -257,6 +266,8 @@ public final class SortSdkSource extends AbstractSource
*/
@Override
public void recoverConsumer() {
- sortClient.getConfig().setStopConsume(false);
+ for (SortClient sortClient : sortClients) {
+ sortClient.getConfig().setStopConsume(false);
+ }
}
}