You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/02/28 09:58:25 UTC
[incubator-inlong] branch master updated: [INLONG-2730][Sort] Reduce the number of AsyncProducerClient (#2777)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 8a2ab24 [INLONG-2730][Sort] Reduce the number of AsyncProducerClient (#2777)
8a2ab24 is described below
commit 8a2ab24e7cf6f57de1c601844ce512a04f44a54c
Author: imvan <de...@pku.edu.cn>
AuthorDate: Mon Feb 28 17:58:17 2022 +0800
[INLONG-2730][Sort] Reduce the number of AsyncProducerClient (#2777)
---
.../sort/standalone/sink/cls/ClsSinkContext.java | 51 ++++++++++++++--------
1 file changed, 32 insertions(+), 19 deletions(-)
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
index e2efb33..b6e4b69 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
@@ -34,8 +34,10 @@ import org.slf4j.Logger;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
/**
* Cls sink context.
@@ -123,33 +125,44 @@ public class ClsSinkContext extends SinkContext {
/**
* Close expire clients and start new clients.
- * <p>Each client response for data of one uid.</p>
- * <p>First, find all UIDs that are in the active clientMap but not in the updated id config (or to say EXPIRE UID),
- * and put those clients into deletingClientsMap.
+ *
+ * <p>Each client response for data of one secretId.</p>
+ * <p>First, find all secretId that are in the active clientMap
+ * but not in the updated id config (or to say EXPIRE secretId), and put those clients into deletingClientsMap.
* The real close process will be done at the beginning of next period of reloading.
- * Second, find all UIDs that in the updated id config but not in the active clientMap(or to say NEW UID),
- * and start new clients for these UIDs and put them into the active clientMap.</p>
+ * Second, find all secretIds that in the updated id config
+ * but not in the active clientMap(or to say NEW secretId),
+ * and start new clients for these secretId and put them into the active clientMap.</p>
*/
private void reloadClients() {
+ // get update secretIds
+ Set<String> updateSecretIdSet = idConfigMap
+ .values()
+ .stream()
+ .map(ClsIdConfig::getSecretId)
+ .collect(Collectors.toSet());
+
+ // remove expire client
clientMap.keySet()
.stream()
- .filter(uid -> !idConfigMap.containsKey(uid))
+ .filter(secretId -> !updateSecretIdSet.contains(secretId))
.forEach(this::removeExpireClient);
- idConfigMap.keySet()
- .stream()
- .filter(uid -> !clientMap.containsKey(uid))
+
+ // start new client
+ updateSecretIdSet.stream()
+ .filter(secretId -> !clientMap.containsKey(secretId))
.forEach(this::startNewClient);
}
/**
* Start new cls client and put it to the active clientMap.
*
- * @param uid UID of new client.
+ * @param secretId SecretId of new client.
*/
- private void startNewClient(String uid) {
- ClsIdConfig idConfig = idConfigMap.get(uid);
+ private void startNewClient(String secretId) {
+ ClsIdConfig idConfig = idConfigMap.get(secretId);
if (idConfig == null) {
- LOG.error("Start client failed, there is not cls config of {}", uid);
+ LOG.error("Start client failed, there is not cls config of {}", secretId);
return;
}
AsyncProducerConfig producerConfig = new AsyncProducerConfig(
@@ -159,7 +172,7 @@ public class ClsSinkContext extends SinkContext {
NetworkUtils.getLocalMachineIP());
// todo set other configs
AsyncProducerClient client = new AsyncProducerClient(producerConfig);
- clientMap.put(uid, client);
+ clientMap.put(secretId, client);
}
/**
@@ -168,14 +181,14 @@ public class ClsSinkContext extends SinkContext {
* Which will happen when worker thread get the client and ready to send msg,
* while the reload thread try to close it.</P>
*
- * @param uid UID of expire client.
+ * @param secretId SecretId of expire client.
*/
- private void removeExpireClient(String uid) {
- AsyncProducerClient client = clientMap.get(uid);
+ private void removeExpireClient(String secretId) {
+ AsyncProducerClient client = clientMap.get(secretId);
if (client == null) {
- LOG.error("Remove client failed, there is not client of {}", uid);
+ LOG.error("Remove client failed, there is not client of {}", secretId);
return;
}
- deletingClients.add(clientMap.remove(uid));
+ deletingClients.add(clientMap.remove(secretId));
}
}