You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/06/10 06:20:22 UTC

[incubator-inlong] branch master updated: [INLONG-4559][Audit] Support select channels by random routing (#4562)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 734c05fb7 [INLONG-4559][Audit] Support select channels by random routing (#4562)
734c05fb7 is described below

commit 734c05fb7e42e90c9cdf472201a54487db1c7001
Author: doleyzi <43...@users.noreply.github.com>
AuthorDate: Fri Jun 10 14:20:17 2022 +0800

    [INLONG-4559][Audit] Support select channels by random routing (#4562)
---
 .../apache/inlong/audit/send/SenderChannel.java    | 83 +++++++++++++++++++++-
 .../org/apache/inlong/audit/send/SenderGroup.java  | 83 +++++-----------------
 .../apache/inlong/audit/send/SenderManager.java    |  6 +-
 .../java/org/apache/inlong/audit/util/Config.java  | 37 +++-------
 .../apache/inlong/audit/send/SenderGroupTest.java  |  2 +-
 5 files changed, 112 insertions(+), 99 deletions(-)

diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderChannel.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderChannel.java
index 224e5d2e5..85f42b4e3 100644
--- a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderChannel.java
+++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderChannel.java
@@ -17,26 +17,43 @@
 
 package org.apache.inlong.audit.send;
 
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import org.apache.inlong.audit.util.EventLoopUtil;
 import org.apache.inlong.audit.util.IpPort;
 import io.netty.channel.Channel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadFactory;
 
 public class SenderChannel {
 
+    private static final Logger LOG = LoggerFactory.getLogger(SenderChannel.class);
+
+    public static final int DEFAULT_SEND_THREADNUM = 1;
+    public static final int DEFAULT_RECEIVE_BUFFER_SIZE = 16777216;
+    public static final int DEFAULT_SEND_BUFFER_SIZE = 16777216;
+
     private IpPort ipPort;
     private Channel channel;
     private Semaphore packToken;
+    private Bootstrap client;
+    private SenderManager senderManager;
 
     /**
      * Constructor
      *
-     * @param channel
      * @param ipPort
      */
-    public SenderChannel(Channel channel, IpPort ipPort, int maxSynchRequest) {
-        this.channel = channel;
+    public SenderChannel(IpPort ipPort, int maxSynchRequest, SenderManager senderManager) {
         this.ipPort = ipPort;
         this.packToken = new Semaphore(maxSynchRequest);
+        this.senderManager = senderManager;
     }
 
     /**
@@ -98,4 +115,64 @@ public class SenderChannel {
     public void setChannel(Channel channel) {
         this.channel = channel;
     }
+
+    private void init() {
+        ThreadFactory selfDefineFactory = new DefaultThreadFactory("audit-client-io",
+                Thread.currentThread().isDaemon());
+
+        EventLoopGroup eventLoopGroup = EventLoopUtil.newEventLoopGroup(DEFAULT_SEND_THREADNUM,
+                false, selfDefineFactory);
+        client = new Bootstrap();
+        client.group(eventLoopGroup);
+        client.channel(EventLoopUtil.getClientSocketChannelClass(eventLoopGroup));
+        client.option(ChannelOption.SO_KEEPALIVE, true);
+        client.option(ChannelOption.TCP_NODELAY, true);
+        client.option(ChannelOption.SO_REUSEADDR, true);
+        client.option(ChannelOption.SO_RCVBUF, DEFAULT_RECEIVE_BUFFER_SIZE);
+        client.option(ChannelOption.SO_SNDBUF, DEFAULT_SEND_BUFFER_SIZE);
+        SenderHandler senderHandler = new SenderHandler(senderManager);
+        client.handler(new ClientPipelineFactory(senderHandler));
+    }
+
+    /**
+     * connect channel
+     *
+     * @return
+     */
+    public boolean connect() {
+        if (checkConnect(this.channel)) {
+            return true;
+        }
+        try {
+            init();
+            synchronized (client) {
+                ChannelFuture future = client.connect(this.ipPort.addr).sync();
+                this.channel = future.channel();
+            }
+        } catch (Throwable e) {
+            LOG.error("connect {} failed. {}", this.getIpPort(), e.getMessage());
+            return false;
+        }
+        return true;
+    }
+
+    /**
+     * check channeel connect
+     *
+     * @param channel
+     * @return
+     */
+    private boolean checkConnect(Channel channel) {
+        try {
+            if (channel == null) {
+                return false;
+            }
+            if (channel.isWritable() || channel.isOpen() || channel.isActive()) {
+                return true;
+            }
+        } catch (Throwable ex) {
+            LOG.error("check connect ex." + ex.getMessage());
+        }
+        return false;
+    }
 }
diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderGroup.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderGroup.java
index 793b99308..7598fa654 100644
--- a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderGroup.java
+++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderGroup.java
@@ -17,16 +17,8 @@
 
 package org.apache.inlong.audit.send;
 
-import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.util.concurrent.DefaultThreadFactory;
-import java.util.concurrent.ThreadFactory;
-import org.apache.inlong.audit.util.EventLoopUtil;
 import org.apache.inlong.audit.util.IpPort;
 import org.apache.inlong.audit.util.SenderResult;
 import org.slf4j.Logger;
@@ -47,45 +39,27 @@ public class SenderGroup {
     public static final int DEFAULT_WAIT_TIMES = 10000;
     public static final int WAIT_INTERVAL = 1;
     public static final int DEFAULT_SYNCH_REQUESTS = 1;
-    public static final int DEFAULT_RECEIVE_BUFFER_SIZE = 16777216;
-    public static final int DEFAULT_SEND_BUFFER_SIZE = 16777216;
+    public static final int RANDOM_MIN = 0;
 
-    private Bootstrap client;
     private List<LinkedBlockingQueue<SenderChannel>> channelGroups = new ArrayList<>();
     private int mIndex = 0;
     private List<SenderChannel> deleteChannels = new ArrayList<>();
     private ConcurrentHashMap<String, SenderChannel> totalChannels = new ConcurrentHashMap<>();
 
-    private int senderThreadNum;
     private int waitChannelTimes = DEFAULT_WAIT_TIMES;
     private int waitChannelIntervalMs = WAIT_INTERVAL;
     private int maxSynchRequest = DEFAULT_SYNCH_REQUESTS;
     private boolean hasSendError = false;
 
+    private SenderManager senderManager;
+
     /**
      * constructor
      *
-     * @param senderThreadNum
-     * @param clientHandler
+     * @param senderManager
      */
-    public SenderGroup(int senderThreadNum, SimpleChannelInboundHandler clientHandler) {
-        this.senderThreadNum = senderThreadNum;
-
-        ThreadFactory selfDefineFactory  = new DefaultThreadFactory("audit-client-io",
-                Thread.currentThread().isDaemon());
-
-        EventLoopGroup eventLoopGroup = EventLoopUtil.newEventLoopGroup(this.senderThreadNum,
-                false, selfDefineFactory);
-        client = new Bootstrap();
-        client.group(eventLoopGroup);
-        client.channel(EventLoopUtil.getClientSocketChannelClass(eventLoopGroup));
-        client.option(ChannelOption.SO_KEEPALIVE, true);
-        client.option(ChannelOption.TCP_NODELAY, true);
-        client.option(ChannelOption.SO_REUSEADDR, true);
-        client.option(ChannelOption.SO_RCVBUF, DEFAULT_RECEIVE_BUFFER_SIZE);
-        client.option(ChannelOption.SO_SNDBUF, DEFAULT_SEND_BUFFER_SIZE);
-        client.handler(new ClientPipelineFactory(clientHandler));
-
+    public SenderGroup(SenderManager senderManager) {
+        this.senderManager = senderManager;
         /*
          * init add two list for update config
          */
@@ -108,20 +82,26 @@ public class SenderGroup {
                 return new SenderResult("channels is empty", 0, false);
             }
             boolean isOk = false;
-            for (int tryIndex = 0; tryIndex < waitChannelTimes; tryIndex++) {
+            for (int tryIndex = 0; tryIndex < MAX_SEND_TIMES; tryIndex++) {
+                int random = RANDOM_MIN + (int) (Math.random() * (channels.size() - RANDOM_MIN));
                 channels = channelGroups.get(mIndex);
                 for (int i = 0; i < channels.size(); i++) {
                     channel = channels.poll();
                     if (channel.tryAcquire()) {
-                        isOk = true;
-                        break;
+                        if (random == i && channel.connect()) {
+                            isOk = true;
+                            break;
+                        }
+                        channel.release();
                     }
                     channels.offer(channel);
                     channel = null;
                 }
+
                 if (isOk) {
                     break;
                 }
+
                 try {
                     Thread.sleep(waitChannelIntervalMs);
                 } catch (Throwable e) {
@@ -132,18 +112,16 @@ public class SenderGroup {
                 logger.error("can not get a channel");
                 return new SenderResult("can not get a channel", 0, false);
             }
+
             ChannelFuture t = null;
             if (channel.getChannel().isWritable()) {
                 t = channel.getChannel().writeAndFlush(dataBuf).sync().await();
                 if (!t.isSuccess()) {
                     if (!channel.getChannel().isActive()) {
-                        reconnect(channel);
+                        channel.connect();
                     }
                     t = channel.getChannel().writeAndFlush(dataBuf).sync().await();
                 }
-            } else {
-                reconnect(channel);
-                t = channel.getChannel().writeAndFlush(dataBuf).sync().await();
             }
             return new SenderResult(channel.getIpPort().ip, channel.getIpPort().port, t.isSuccess());
         } catch (Throwable ex) {
@@ -207,8 +185,7 @@ public class SenderGroup {
                     if (ipPortObj == null) {
                         continue;
                     }
-                    ChannelFuture future = client.connect(ipPortObj.addr).await();
-                    channel = new SenderChannel(future.channel(), ipPortObj, maxSynchRequest);
+                    channel = new SenderChannel(ipPortObj, maxSynchRequest, senderManager);
                     newChannels.add(channel);
                     totalChannels.put(ipPort, channel);
                 } catch (Exception e) {
@@ -230,30 +207,6 @@ public class SenderGroup {
         }
     }
 
-    /**
-     * reconnect
-     *
-     * @param channel
-     */
-    private void reconnect(SenderChannel channel) {
-        try {
-            synchronized (channel) {
-                if (channel.getChannel().isOpen()) {
-                    return;
-                }
-
-                Channel oldChannel = channel.getChannel();
-                ChannelFuture future = client.connect(channel.getIpPort().addr).await();
-                Channel newChannel = future.channel();
-                channel.setChannel(newChannel);
-                oldChannel.disconnect();
-                oldChannel.close();
-            }
-        } catch (Throwable e) {
-            logger.error("reconnect failed." + e.getMessage());
-        }
-    }
-
     /**
      * get hasSendError
      *
diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java
index d7c9139d9..b841014d5 100644
--- a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java
+++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java
@@ -46,7 +46,6 @@ import java.util.concurrent.atomic.AtomicLong;
  */
 public class SenderManager {
     private static final Logger logger = LoggerFactory.getLogger(SenderManager.class);
-    public static final int DEFAULT_SEND_THREADNUM = 2;
     public static final Long MAX_REQUEST_ID = 1000000000L;
     private static final int SEND_INTERVAL_MS = 20;
     public static final int ALL_CONNECT_CHANNEL = -1;
@@ -80,8 +79,7 @@ public class SenderManager {
         try {
             this.auditConfig = config;
             this.maxConnectChannels = maxConnectChannels;
-            SenderHandler clientHandler = new SenderHandler(this);
-            this.sender = new SenderGroup(DEFAULT_SEND_THREADNUM, clientHandler);
+            this.sender = new SenderGroup(this);
         } catch (Exception ex) {
             logger.error(ex.getMessage());
         }
@@ -154,7 +152,7 @@ public class SenderManager {
             logger.warn("send data is empty!");
             return;
         }
-        ByteBuf dataBuf =  ByteBufAllocator.DEFAULT.buffer(data.length);
+        ByteBuf dataBuf = ByteBufAllocator.DEFAULT.buffer(data.length);
         dataBuf.writeBytes(data);
         SenderResult result = this.sender.send(dataBuf);
         if (!result.result) {
diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/Config.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/Config.java
index f7ce31489..32f9e7f68 100644
--- a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/Config.java
+++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/Config.java
@@ -23,7 +23,6 @@ import org.slf4j.LoggerFactory;
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileReader;
-import java.io.IOException;
 import java.net.InetAddress;
 import java.net.NetworkInterface;
 import java.net.SocketException;
@@ -73,37 +72,23 @@ public class Config {
     }
 
     private void initDockerId() {
-        BufferedReader in = null;
-        try {
-            File file = new File("/proc/self/cgroup");
-            if (file.exists() == false) {
-                return;
-            }
-            in = new BufferedReader(new FileReader("/proc/self/cgroup"));
+        File file = new File("/proc/self/cgroup");
+        if (!file.exists()) {
+            return;
+        }
+        try (BufferedReader in = new BufferedReader(new FileReader("/proc/self/cgroup"))) {
             String dockerID = in.readLine();
-            if (dockerID.equals("") == false) {
+            if (dockerID != null) {
                 int n = dockerID.indexOf("/");
                 String dockerID2 = dockerID.substring(n + 1, (dockerID.length() - n - 1));
                 n = dockerID2.indexOf("/");
-                dockerId = dockerID2.substring(n + 1, 12);
-            }
-        } catch (IOException e) {
-            logger.error(e.getMessage());
-            return;
-        } catch (NullPointerException e2) {
-            logger.error(e2.getMessage());
-            return;
-        } catch (Exception e3) {
-            logger.error(e3.getMessage());
-            return;
-        } finally {
-            if (in != null) {
-                try {
-                    in.close();
-                } catch (IOException e4) {
-                    logger.error(e4.getMessage());
+                if (dockerID2.length() > 12) {
+                    dockerId = dockerID2.substring(n + 1, 12);
                 }
+                in.close();
             }
+        } catch (Exception ex) {
+            logger.error(ex.toString());
         }
     }
 }
diff --git a/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/send/SenderGroupTest.java b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/send/SenderGroupTest.java
index d375a457b..bea1f2419 100644
--- a/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/send/SenderGroupTest.java
+++ b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/send/SenderGroupTest.java
@@ -27,7 +27,7 @@ public class SenderGroupTest {
     AuditConfig testConfig = new AuditConfig();
     SenderManager testManager = new SenderManager(testConfig);
     SenderHandler clientHandler = new org.apache.inlong.audit.send.SenderHandler(testManager);
-    SenderGroup sender = new org.apache.inlong.audit.send.SenderGroup(10, clientHandler);
+    SenderGroup sender = new org.apache.inlong.audit.send.SenderGroup(testManager);
 
     @Test
     public void isHasSendError() {