You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/06/10 06:20:22 UTC
[incubator-inlong] branch master updated: [INLONG-4559][Audit] Support select channels by random routing (#4562)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 734c05fb7 [INLONG-4559][Audit] Support select channels by random routing (#4562)
734c05fb7 is described below
commit 734c05fb7e42e90c9cdf472201a54487db1c7001
Author: doleyzi <43...@users.noreply.github.com>
AuthorDate: Fri Jun 10 14:20:17 2022 +0800
[INLONG-4559][Audit] Support select channels by random routing (#4562)
---
.../apache/inlong/audit/send/SenderChannel.java | 83 +++++++++++++++++++++-
.../org/apache/inlong/audit/send/SenderGroup.java | 83 +++++-----------------
.../apache/inlong/audit/send/SenderManager.java | 6 +-
.../java/org/apache/inlong/audit/util/Config.java | 37 +++-------
.../apache/inlong/audit/send/SenderGroupTest.java | 2 +-
5 files changed, 112 insertions(+), 99 deletions(-)
diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderChannel.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderChannel.java
index 224e5d2e5..85f42b4e3 100644
--- a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderChannel.java
+++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderChannel.java
@@ -17,26 +17,43 @@
package org.apache.inlong.audit.send;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import org.apache.inlong.audit.util.EventLoopUtil;
import org.apache.inlong.audit.util.IpPort;
import io.netty.channel.Channel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadFactory;
public class SenderChannel {
+ private static final Logger LOG = LoggerFactory.getLogger(SenderChannel.class);
+
+ public static final int DEFAULT_SEND_THREADNUM = 1;
+ public static final int DEFAULT_RECEIVE_BUFFER_SIZE = 16777216;
+ public static final int DEFAULT_SEND_BUFFER_SIZE = 16777216;
+
private IpPort ipPort;
private Channel channel;
private Semaphore packToken;
+ private Bootstrap client;
+ private SenderManager senderManager;
/**
* Constructor
*
- * @param channel
* @param ipPort
*/
- public SenderChannel(Channel channel, IpPort ipPort, int maxSynchRequest) {
- this.channel = channel;
+ public SenderChannel(IpPort ipPort, int maxSynchRequest, SenderManager senderManager) {
this.ipPort = ipPort;
this.packToken = new Semaphore(maxSynchRequest);
+ this.senderManager = senderManager;
}
/**
@@ -98,4 +115,64 @@ public class SenderChannel {
public void setChannel(Channel channel) {
this.channel = channel;
}
+
+ private void init() {
+ ThreadFactory selfDefineFactory = new DefaultThreadFactory("audit-client-io",
+ Thread.currentThread().isDaemon());
+
+ EventLoopGroup eventLoopGroup = EventLoopUtil.newEventLoopGroup(DEFAULT_SEND_THREADNUM,
+ false, selfDefineFactory);
+ client = new Bootstrap();
+ client.group(eventLoopGroup);
+ client.channel(EventLoopUtil.getClientSocketChannelClass(eventLoopGroup));
+ client.option(ChannelOption.SO_KEEPALIVE, true);
+ client.option(ChannelOption.TCP_NODELAY, true);
+ client.option(ChannelOption.SO_REUSEADDR, true);
+ client.option(ChannelOption.SO_RCVBUF, DEFAULT_RECEIVE_BUFFER_SIZE);
+ client.option(ChannelOption.SO_SNDBUF, DEFAULT_SEND_BUFFER_SIZE);
+ SenderHandler senderHandler = new SenderHandler(senderManager);
+ client.handler(new ClientPipelineFactory(senderHandler));
+ }
+
+ /**
+ * connect channel
+ *
+ * @return
+ */
+ public boolean connect() {
+ if (checkConnect(this.channel)) {
+ return true;
+ }
+ try {
+ init();
+ synchronized (client) {
+ ChannelFuture future = client.connect(this.ipPort.addr).sync();
+ this.channel = future.channel();
+ }
+ } catch (Throwable e) {
+ LOG.error("connect {} failed. {}", this.getIpPort(), e.getMessage());
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * check channeel connect
+ *
+ * @param channel
+ * @return
+ */
+ private boolean checkConnect(Channel channel) {
+ try {
+ if (channel == null) {
+ return false;
+ }
+ if (channel.isWritable() || channel.isOpen() || channel.isActive()) {
+ return true;
+ }
+ } catch (Throwable ex) {
+ LOG.error("check connect ex." + ex.getMessage());
+ }
+ return false;
+ }
}
diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderGroup.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderGroup.java
index 793b99308..7598fa654 100644
--- a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderGroup.java
+++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderGroup.java
@@ -17,16 +17,8 @@
package org.apache.inlong.audit.send;
-import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.util.concurrent.DefaultThreadFactory;
-import java.util.concurrent.ThreadFactory;
-import org.apache.inlong.audit.util.EventLoopUtil;
import org.apache.inlong.audit.util.IpPort;
import org.apache.inlong.audit.util.SenderResult;
import org.slf4j.Logger;
@@ -47,45 +39,27 @@ public class SenderGroup {
public static final int DEFAULT_WAIT_TIMES = 10000;
public static final int WAIT_INTERVAL = 1;
public static final int DEFAULT_SYNCH_REQUESTS = 1;
- public static final int DEFAULT_RECEIVE_BUFFER_SIZE = 16777216;
- public static final int DEFAULT_SEND_BUFFER_SIZE = 16777216;
+ public static final int RANDOM_MIN = 0;
- private Bootstrap client;
private List<LinkedBlockingQueue<SenderChannel>> channelGroups = new ArrayList<>();
private int mIndex = 0;
private List<SenderChannel> deleteChannels = new ArrayList<>();
private ConcurrentHashMap<String, SenderChannel> totalChannels = new ConcurrentHashMap<>();
- private int senderThreadNum;
private int waitChannelTimes = DEFAULT_WAIT_TIMES;
private int waitChannelIntervalMs = WAIT_INTERVAL;
private int maxSynchRequest = DEFAULT_SYNCH_REQUESTS;
private boolean hasSendError = false;
+ private SenderManager senderManager;
+
/**
* constructor
*
- * @param senderThreadNum
- * @param clientHandler
+ * @param senderManager
*/
- public SenderGroup(int senderThreadNum, SimpleChannelInboundHandler clientHandler) {
- this.senderThreadNum = senderThreadNum;
-
- ThreadFactory selfDefineFactory = new DefaultThreadFactory("audit-client-io",
- Thread.currentThread().isDaemon());
-
- EventLoopGroup eventLoopGroup = EventLoopUtil.newEventLoopGroup(this.senderThreadNum,
- false, selfDefineFactory);
- client = new Bootstrap();
- client.group(eventLoopGroup);
- client.channel(EventLoopUtil.getClientSocketChannelClass(eventLoopGroup));
- client.option(ChannelOption.SO_KEEPALIVE, true);
- client.option(ChannelOption.TCP_NODELAY, true);
- client.option(ChannelOption.SO_REUSEADDR, true);
- client.option(ChannelOption.SO_RCVBUF, DEFAULT_RECEIVE_BUFFER_SIZE);
- client.option(ChannelOption.SO_SNDBUF, DEFAULT_SEND_BUFFER_SIZE);
- client.handler(new ClientPipelineFactory(clientHandler));
-
+ public SenderGroup(SenderManager senderManager) {
+ this.senderManager = senderManager;
/*
* init add two list for update config
*/
@@ -108,20 +82,26 @@ public class SenderGroup {
return new SenderResult("channels is empty", 0, false);
}
boolean isOk = false;
- for (int tryIndex = 0; tryIndex < waitChannelTimes; tryIndex++) {
+ for (int tryIndex = 0; tryIndex < MAX_SEND_TIMES; tryIndex++) {
+ int random = RANDOM_MIN + (int) (Math.random() * (channels.size() - RANDOM_MIN));
channels = channelGroups.get(mIndex);
for (int i = 0; i < channels.size(); i++) {
channel = channels.poll();
if (channel.tryAcquire()) {
- isOk = true;
- break;
+ if (random == i && channel.connect()) {
+ isOk = true;
+ break;
+ }
+ channel.release();
}
channels.offer(channel);
channel = null;
}
+
if (isOk) {
break;
}
+
try {
Thread.sleep(waitChannelIntervalMs);
} catch (Throwable e) {
@@ -132,18 +112,16 @@ public class SenderGroup {
logger.error("can not get a channel");
return new SenderResult("can not get a channel", 0, false);
}
+
ChannelFuture t = null;
if (channel.getChannel().isWritable()) {
t = channel.getChannel().writeAndFlush(dataBuf).sync().await();
if (!t.isSuccess()) {
if (!channel.getChannel().isActive()) {
- reconnect(channel);
+ channel.connect();
}
t = channel.getChannel().writeAndFlush(dataBuf).sync().await();
}
- } else {
- reconnect(channel);
- t = channel.getChannel().writeAndFlush(dataBuf).sync().await();
}
return new SenderResult(channel.getIpPort().ip, channel.getIpPort().port, t.isSuccess());
} catch (Throwable ex) {
@@ -207,8 +185,7 @@ public class SenderGroup {
if (ipPortObj == null) {
continue;
}
- ChannelFuture future = client.connect(ipPortObj.addr).await();
- channel = new SenderChannel(future.channel(), ipPortObj, maxSynchRequest);
+ channel = new SenderChannel(ipPortObj, maxSynchRequest, senderManager);
newChannels.add(channel);
totalChannels.put(ipPort, channel);
} catch (Exception e) {
@@ -230,30 +207,6 @@ public class SenderGroup {
}
}
- /**
- * reconnect
- *
- * @param channel
- */
- private void reconnect(SenderChannel channel) {
- try {
- synchronized (channel) {
- if (channel.getChannel().isOpen()) {
- return;
- }
-
- Channel oldChannel = channel.getChannel();
- ChannelFuture future = client.connect(channel.getIpPort().addr).await();
- Channel newChannel = future.channel();
- channel.setChannel(newChannel);
- oldChannel.disconnect();
- oldChannel.close();
- }
- } catch (Throwable e) {
- logger.error("reconnect failed." + e.getMessage());
- }
- }
-
/**
* get hasSendError
*
diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java
index d7c9139d9..b841014d5 100644
--- a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java
+++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java
@@ -46,7 +46,6 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class SenderManager {
private static final Logger logger = LoggerFactory.getLogger(SenderManager.class);
- public static final int DEFAULT_SEND_THREADNUM = 2;
public static final Long MAX_REQUEST_ID = 1000000000L;
private static final int SEND_INTERVAL_MS = 20;
public static final int ALL_CONNECT_CHANNEL = -1;
@@ -80,8 +79,7 @@ public class SenderManager {
try {
this.auditConfig = config;
this.maxConnectChannels = maxConnectChannels;
- SenderHandler clientHandler = new SenderHandler(this);
- this.sender = new SenderGroup(DEFAULT_SEND_THREADNUM, clientHandler);
+ this.sender = new SenderGroup(this);
} catch (Exception ex) {
logger.error(ex.getMessage());
}
@@ -154,7 +152,7 @@ public class SenderManager {
logger.warn("send data is empty!");
return;
}
- ByteBuf dataBuf = ByteBufAllocator.DEFAULT.buffer(data.length);
+ ByteBuf dataBuf = ByteBufAllocator.DEFAULT.buffer(data.length);
dataBuf.writeBytes(data);
SenderResult result = this.sender.send(dataBuf);
if (!result.result) {
diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/Config.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/Config.java
index f7ce31489..32f9e7f68 100644
--- a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/Config.java
+++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/Config.java
@@ -23,7 +23,6 @@ import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
-import java.io.IOException;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
@@ -73,37 +72,23 @@ public class Config {
}
private void initDockerId() {
- BufferedReader in = null;
- try {
- File file = new File("/proc/self/cgroup");
- if (file.exists() == false) {
- return;
- }
- in = new BufferedReader(new FileReader("/proc/self/cgroup"));
+ File file = new File("/proc/self/cgroup");
+ if (!file.exists()) {
+ return;
+ }
+ try (BufferedReader in = new BufferedReader(new FileReader("/proc/self/cgroup"))) {
String dockerID = in.readLine();
- if (dockerID.equals("") == false) {
+ if (dockerID != null) {
int n = dockerID.indexOf("/");
String dockerID2 = dockerID.substring(n + 1, (dockerID.length() - n - 1));
n = dockerID2.indexOf("/");
- dockerId = dockerID2.substring(n + 1, 12);
- }
- } catch (IOException e) {
- logger.error(e.getMessage());
- return;
- } catch (NullPointerException e2) {
- logger.error(e2.getMessage());
- return;
- } catch (Exception e3) {
- logger.error(e3.getMessage());
- return;
- } finally {
- if (in != null) {
- try {
- in.close();
- } catch (IOException e4) {
- logger.error(e4.getMessage());
+ if (dockerID2.length() > 12) {
+ dockerId = dockerID2.substring(n + 1, 12);
}
+ in.close();
}
+ } catch (Exception ex) {
+ logger.error(ex.toString());
}
}
}
diff --git a/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/send/SenderGroupTest.java b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/send/SenderGroupTest.java
index d375a457b..bea1f2419 100644
--- a/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/send/SenderGroupTest.java
+++ b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/send/SenderGroupTest.java
@@ -27,7 +27,7 @@ public class SenderGroupTest {
AuditConfig testConfig = new AuditConfig();
SenderManager testManager = new SenderManager(testConfig);
SenderHandler clientHandler = new org.apache.inlong.audit.send.SenderHandler(testManager);
- SenderGroup sender = new org.apache.inlong.audit.send.SenderGroup(10, clientHandler);
+ SenderGroup sender = new org.apache.inlong.audit.send.SenderGroup(testManager);
@Test
public void isHasSendError() {