You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/02/28 09:58:25 UTC

[incubator-inlong] branch master updated: [INLONG-2730][Sort] Reduce the number of AsyncProducerClient (#2777)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a2ab24  [INLONG-2730][Sort] Reduce the number of AsyncProducerClient (#2777)
8a2ab24 is described below

commit 8a2ab24e7cf6f57de1c601844ce512a04f44a54c
Author: imvan <de...@pku.edu.cn>
AuthorDate: Mon Feb 28 17:58:17 2022 +0800

    [INLONG-2730][Sort] Reduce the number of AsyncProducerClient (#2777)
---
 .../sort/standalone/sink/cls/ClsSinkContext.java   | 51 ++++++++++++++--------
 1 file changed, 32 insertions(+), 19 deletions(-)

diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
index e2efb33..b6e4b69 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
@@ -34,8 +34,10 @@ import org.slf4j.Logger;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 /**
  * Cls sink context.
@@ -123,33 +125,44 @@ public class ClsSinkContext extends SinkContext {
 
     /**
      * Close expire clients and start new clients.
-     * <p>Each client response for data of one uid.</p>
-     * <p>First, find all UIDs that are in the active clientMap but not in the updated id config (or to say EXPIRE UID),
-     * and put those clients into deletingClientsMap.
+     * 
+     * <p>Each client response for data of one secretId.</p>
+     * <p>First, find all secretId that are in the active clientMap
+     * but not in the updated id config (or to say EXPIRE secretId), and put those clients into deletingClientsMap.
      * The real close process will be done at the beginning of next period of reloading.
-     * Second, find all UIDs that in the updated id config but not in the active clientMap(or to say NEW UID),
-     * and start new clients for these UIDs and put them into the active clientMap.</p>
+     * Second, find all secretIds that in the updated id config
+     * but not in the active clientMap(or to say NEW secretId),
+     * and start new clients for these secretId and put them into the active clientMap.</p>
      */
     private void reloadClients() {
+        // get update secretIds
+        Set<String> updateSecretIdSet = idConfigMap
+                .values()
+                .stream()
+                .map(ClsIdConfig::getSecretId)
+                .collect(Collectors.toSet());
+
+        // remove expire client
         clientMap.keySet()
                 .stream()
-                .filter(uid -> !idConfigMap.containsKey(uid))
+                .filter(secretId -> !updateSecretIdSet.contains(secretId))
                 .forEach(this::removeExpireClient);
-        idConfigMap.keySet()
-                .stream()
-                .filter(uid -> !clientMap.containsKey(uid))
+
+        // start new client
+        updateSecretIdSet.stream()
+                .filter(secretId -> !clientMap.containsKey(secretId))
                 .forEach(this::startNewClient);
     }
 
     /**
      * Start new cls client and put it to the active clientMap.
      *
-     * @param uid UID of new client.
+     * @param secretId SecretId of new client.
      */
-    private void startNewClient(String uid) {
-        ClsIdConfig idConfig = idConfigMap.get(uid);
+    private void startNewClient(String secretId) {
+        ClsIdConfig idConfig = idConfigMap.get(secretId);
         if (idConfig == null) {
-            LOG.error("Start client failed, there is not cls config of {}", uid);
+            LOG.error("Start client failed, there is not cls config of {}", secretId);
             return;
         }
         AsyncProducerConfig producerConfig = new AsyncProducerConfig(
@@ -159,7 +172,7 @@ public class ClsSinkContext extends SinkContext {
                 NetworkUtils.getLocalMachineIP());
         // todo set other configs
         AsyncProducerClient client = new AsyncProducerClient(producerConfig);
-        clientMap.put(uid, client);
+        clientMap.put(secretId, client);
     }
 
     /**
@@ -168,14 +181,14 @@ public class ClsSinkContext extends SinkContext {
      * Which will happen when worker thread get the client and ready to send msg,
      * while the reload thread try to close it.</P>
      *
-     * @param uid UID of expire client.
+     * @param secretId SecretId of expire client.
      */
-    private void removeExpireClient(String uid) {
-        AsyncProducerClient client = clientMap.get(uid);
+    private void removeExpireClient(String secretId) {
+        AsyncProducerClient client = clientMap.get(secretId);
         if (client == null) {
-            LOG.error("Remove client failed, there is not client of {}", uid);
+            LOG.error("Remove client failed, there is not client of {}", secretId);
             return;
         }
-        deletingClients.add(clientMap.remove(uid));
+        deletingClients.add(clientMap.remove(secretId));
     }
 }