You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/07/31 03:01:02 UTC
[inlong] 01/01: [INLONG-5097][TubeMQ] Keep the protocol compatible with previous versions (#5214)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
commit ce3bf56281667004421732c8df426b20721a1ef5
Author: baomingyu <ba...@163.com>
AuthorDate: Sat Jul 30 21:32:53 2022 -0500
[INLONG-5097][TubeMQ] Keep the protocol compatible with previous versions (#5214)
---
.../apache/inlong/tubemq/corerpc/RpcConstants.java | 3 -
.../inlong/tubemq/corerpc/netty/NettyClient.java | 2 -
.../tubemq/corerpc/netty/NettyClientFactory.java | 28 +------
.../tubemq/corerpc/netty/NettyProtocolDecoder.java | 95 +++++++++++++++-------
.../tubemq/corerpc/netty/NettyProtocolEncoder.java | 15 ++--
.../tubemq/corerpc/netty/NettyRpcServer.java | 6 --
.../corerpc/netty/NettyProtocolEncoderTest.java | 1 -
7 files changed, 80 insertions(+), 70 deletions(-)
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/RpcConstants.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/RpcConstants.java
index 393863a0d..4bdbd4f36 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/RpcConstants.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/RpcConstants.java
@@ -43,7 +43,6 @@ public final class RpcConstants {
public static final String NETTY_WRITE_HIGH_MARK = "rpc.netty.write.highmark";
public static final String NETTY_WRITE_LOW_MARK = "rpc.netty.write.lowmark";
public static final String NETTY_TCP_SENDBUF = "rpc.netty.send.buffer";
- public static final String NETTY_TCP_MAX_MESSAGE_SIZE = "rpc.netty.max.message.size";
public static final String NETTY_TCP_RECEIVEBUF = "rpc.netty.receive.buffer";
public static final String NETTY_TCP_ENABLEBUSYWAIT = "rpc.netty.enable.busy.wait";
@@ -133,6 +132,4 @@ public final class RpcConstants {
public static final long CFG_UNAVAILABLE_FORBIDDEN_DURATION_MS = 50000;
public static final long CFG_DEFAULT_NETTY_WRITEBUFFER_HIGH_MARK = 50 * 1024 * 1024;
public static final long CFG_DEFAULT_NETTY_WRITEBUFFER_LOW_MARK = 5 * 1024 * 1024;
- public static final int CFG_DEFAULT_NETTY_TCP_MAX_MESSAGE_SIZE = 5 * 1024 * 1024;
-
}
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyClient.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyClient.java
index 0faff9a05..30f330e75 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyClient.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyClient.java
@@ -285,9 +285,7 @@ public class NettyClient implements Client {
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object e) {
- logger.debug("client message receive!");
if (e instanceof RpcDataPack) {
- logger.debug("RpcDataPack client message receive!");
RpcDataPack dataPack = (RpcDataPack) e;
Callback callback = requests.remove(dataPack.getSerialNo());
if (callback != null) {
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyClientFactory.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyClientFactory.java
index eaa72721b..291a97137 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyClientFactory.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyClientFactory.java
@@ -25,15 +25,12 @@ import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -61,11 +58,8 @@ public class NettyClientFactory implements ClientFactory {
new ConcurrentHashMap<>();
protected AtomicBoolean shutdown = new AtomicBoolean(true);
private EventLoopGroup eventLoopGroup;
- private ExecutorService bossExecutorService;
- private ExecutorService workerExecutorService;
private AtomicInteger workerIdCounter = new AtomicInteger(0);
// TSL encryption and need Two Way Authentic
- private int maxMessageSize;
private boolean enableTLS = false;
private boolean needTwoWayAuthentic = false;
private String keyStorePath;
@@ -87,8 +81,6 @@ public class NettyClientFactory implements ClientFactory {
if (this.shutdown.compareAndSet(true, false)) {
enableTLS = conf.getBoolean(RpcConstants.TLS_OVER_TCP, false);
needTwoWayAuthentic = conf.getBoolean(RpcConstants.TLS_TWO_WAY_AUTHENTIC, false);
- this.maxMessageSize = conf.getInt(RpcConstants.NETTY_TCP_MAX_MESSAGE_SIZE,
- RpcConstants.CFG_DEFAULT_NETTY_TCP_MAX_MESSAGE_SIZE);
if (enableTLS) {
trustStorePath = conf.getString(RpcConstants.TLS_TRUSTSTORE_PATH);
trustStorePassword = conf.getString(RpcConstants.TLS_TRUSTSTORE_PASSWORD);
@@ -105,16 +97,9 @@ public class NettyClientFactory implements ClientFactory {
trustStorePath = null;
trustStorePassword = null;
}
- final int bossCount =
- conf.getInt(RpcConstants.BOSS_COUNT,
- RpcConstants.CFG_DEFAULT_BOSS_COUNT);
final int workerCount =
conf.getInt(RpcConstants.WORKER_COUNT,
RpcConstants.CFG_DEFAULT_CLIENT_WORKER_COUNT);
- final int callbackCount =
- conf.getInt(RpcConstants.CALLBACK_WORKER_COUNT, 3);
- bossExecutorService = Executors.newCachedThreadPool();
- workerExecutorService = Executors.newCachedThreadPool();
String threadName = new StringBuilder(256)
.append(conf.getString(RpcConstants.WORKER_THREAD_NAME,
RpcConstants.CFG_DEFAULT_WORKER_THREAD_NAME))
@@ -202,11 +187,8 @@ public class NettyClientFactory implements ClientFactory {
}
}
}
- if (this.bossExecutorService != null) {
- this.bossExecutorService.shutdown();
- }
- if (this.workerExecutorService != null) {
- this.workerExecutorService.shutdown();
+ if (this.eventLoopGroup != null && !eventLoopGroup.isShutdown()) {
+ this.eventLoopGroup.shutdownGracefully();
}
} catch (Exception e) {
logger.error("has exception ", e);
@@ -248,7 +230,8 @@ public class NettyClientFactory implements ClientFactory {
try {
SSLEngine sslEngine =
TSSLEngineUtil.createSSLEngine(keyStorePath, trustStorePath,
- keyStorePassword, trustStorePassword, true, needTwoWayAuthentic);
+ keyStorePassword, trustStorePassword, true,
+ needTwoWayAuthentic);
pipeline.addLast("ssl", new SslHandler(sslEngine));
} catch (Throwable t) {
logger.error(new StringBuilder(256)
@@ -257,9 +240,6 @@ public class NettyClientFactory implements ClientFactory {
throw new Exception(t);
}
}
- socketChannel.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(maxMessageSize,
- 0, 4, 0, 4));
-
// Encode the data
pipeline.addLast("protocolEncoder", new NettyProtocolEncoder());
// Decode the bytes into a Rpc Data Pack
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyProtocolDecoder.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyProtocolDecoder.java
index 238677076..9b88870c2 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyProtocolDecoder.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyProtocolDecoder.java
@@ -20,9 +20,11 @@ package org.apache.inlong.tubemq.corerpc.netty;
import static org.apache.inlong.tubemq.corebase.utils.AddressUtils.getRemoteAddressIP;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
+import io.netty.util.ReferenceCountUtil;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@@ -43,50 +45,83 @@ public class NettyProtocolDecoder extends MessageToMessageDecoder<ByteBuf> {
new ConcurrentHashMap<>();
private static AtomicLong lastProtolTime = new AtomicLong(0);
private static AtomicLong lastSizeTime = new AtomicLong(0);
+ private boolean packHeaderRead = false;
+ private int listSize;
+ private List<RpcDataPack> rpcDataPackList = new ArrayList<>();
+ private RpcDataPack dataPack;
+ private ByteBuf lastByteBuf;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
- if (buffer.readableBytes() < 12) {
- logger.warn("Decode buffer.readableBytes() < 12 !");
- return;
- }
- int frameToken = buffer.readInt();
- filterIllegalPkgToken(frameToken,
- RpcConstants.RPC_PROTOCOL_BEGIN_TOKEN, ctx.channel());
- int serialNo = buffer.readInt();
- int tmpListSize = buffer.readInt();
- filterIllegalPackageSize(true, tmpListSize,
- RpcConstants.MAX_FRAME_MAX_LIST_SIZE, ctx.channel());
- RpcDataPack dataPack = new RpcDataPack(serialNo, new ArrayList<ByteBuffer>());
- // get PackBody
- int i = 0;
- while (i < tmpListSize) {
- i++;
+ buffer = convertToNewBuf(buffer);
+ while (buffer.readableBytes() > 0) {
+ if (!packHeaderRead) {
+ if (buffer.readableBytes() < 12) {
+ saveRemainedByteBuf(buffer);
+ break;
+ }
+ int frameToken = buffer.readInt();
+ filterIllegalPkgToken(frameToken, RpcConstants.RPC_PROTOCOL_BEGIN_TOKEN, ctx.channel());
+ int serialNo = buffer.readInt();
+ int tmpListSize = buffer.readInt();
+ filterIllegalPackageSize(true, tmpListSize,
+ RpcConstants.MAX_FRAME_MAX_LIST_SIZE, ctx.channel());
+ this.listSize = tmpListSize;
+ this.dataPack = new RpcDataPack(serialNo, new ArrayList<>(this.listSize));
+ this.packHeaderRead = true;
+ }
+ // get PackBody
if (buffer.readableBytes() < 4) {
- logger.warn("Decode buffer.readableBytes() < 4 !");
+ saveRemainedByteBuf(buffer);
break;
}
buffer.markReaderIndex();
int length = buffer.readInt();
- filterIllegalPackageSize(false, length,
- RpcConstants.RPC_MAX_BUFFER_SIZE, ctx.channel());
+ if (buffer.readableBytes() < length) {
+ buffer.resetReaderIndex();
+ saveRemainedByteBuf(buffer);
+ break;
+ }
ByteBuffer bb = ByteBuffer.allocate(length);
buffer.readBytes(bb);
bb.flip();
dataPack.getDataLst().add(bb);
+ if (dataPack.getDataLst().size() == listSize) {
+ packHeaderRead = false;
+ rpcDataPackList.add(dataPack);
+ }
}
+ if (rpcDataPackList.size() > 0) {
+ out.addAll(rpcDataPackList);
+ rpcDataPackList.clear();
+ }
+ }
+
+ private void saveRemainedByteBuf(ByteBuf byteBuf) {
+ if (byteBuf != null && byteBuf.readableBytes() > 0) {
+ lastByteBuf = Unpooled.copiedBuffer(byteBuf);
+ }
+ }
- if (dataPack.getDataLst().size() == tmpListSize) {
- out.add(dataPack);
- } else {
- logger.warn("Decode dataPack.getDataLst().size()[{}] != tmpListSize [{}] !",
- dataPack.getDataLst().size(), tmpListSize);
- return;
+ private ByteBuf convertToNewBuf(ByteBuf byteBuf) {
+ ByteBuf newByteBuf = byteBuf;
+ int totalReadBytes = byteBuf.readableBytes();
+ if (lastByteBuf != null) {
+ try {
+ totalReadBytes += lastByteBuf.readableBytes();
+ newByteBuf = Unpooled.buffer(totalReadBytes);
+ newByteBuf.writeBytes(lastByteBuf);
+ newByteBuf.writeBytes(byteBuf);
+ } finally {
+ ReferenceCountUtil.release(lastByteBuf);
+ }
+ lastByteBuf = null;
}
+ return newByteBuf;
}
- private void filterIllegalPkgToken(int inParamValue,
- int allowTokenVal, Channel channel) throws UnknownProtocolException {
+ private void filterIllegalPkgToken(int inParamValue, int allowTokenVal,
+ Channel channel) throws UnknownProtocolException {
if (inParamValue != allowTokenVal) {
String rmtaddrIp = getRemoteAddressIP(channel);
if (rmtaddrIp != null) {
@@ -103,7 +138,11 @@ public class NettyProtocolDecoder extends MessageToMessageDecoder<ByteBuf> {
long curTime = System.currentTimeMillis();
if (curTime - befTime > 180000) {
if (lastProtolTime.compareAndSet(befTime, System.currentTimeMillis())) {
- logger.warn("[Abnormal Visit] OSS Tube visit list is :" + errProtolAddrMap.toString());
+ logger.warn("[Abnormal Visit] OSS Tube [inParamValue = {} vs "
+ + "allowTokenVal = {}] visit "
+ + "list is : {}",
+ inParamValue, allowTokenVal,
+ errProtolAddrMap.toString());
errProtolAddrMap.clear();
}
}
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyProtocolEncoder.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyProtocolEncoder.java
index fde7b523a..3f52d35bf 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyProtocolEncoder.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyProtocolEncoder.java
@@ -24,6 +24,7 @@ import io.netty.handler.codec.MessageToMessageEncoder;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.inlong.tubemq.corerpc.RpcConstants;
@@ -38,19 +39,17 @@ public class NettyProtocolEncoder extends MessageToMessageEncoder<RpcDataPack> {
@Override
protected void encode(ChannelHandlerContext chx, RpcDataPack msg, List<Object> out) {
RpcDataPack dataPack = msg;
- List<ByteBuffer> origs = dataPack.getDataLst();
- ByteArrayOutputStream byteOut = new ByteArrayOutputStream();
- try {
+ try (ByteArrayOutputStream byteOut = new ByteArrayOutputStream()) {
byteOut.write(getPackHeader(dataPack).array());
+ List<ByteBuffer> origs = dataPack.getDataLst();
Iterator<ByteBuffer> iter = origs.iterator();
while (iter.hasNext()) {
ByteBuffer entry = iter.next();
byteOut.write(getLengthHeader(entry).array());
- byteOut.write(entry.array());
+ byteOut.write(getLengthBody(entry));
}
byte[] body = byteOut.toByteArray();
- ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(4 + body.length);
- buf.writeInt(body.length);
+ ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(body.length);
buf.writeBytes(body);
out.add(buf);
} catch (IOException e) {
@@ -73,4 +72,8 @@ public class NettyProtocolEncoder extends MessageToMessageEncoder<RpcDataPack> {
header.flip();
return header;
}
+
+ private byte[] getLengthBody(ByteBuffer buf) {
+ return Arrays.copyOf(buf.array(), buf.limit());
+ }
}
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyRpcServer.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyRpcServer.java
index f21e3ea62..5f19d22e8 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyRpcServer.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyRpcServer.java
@@ -28,7 +28,6 @@ import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.DataOutputStream;
@@ -82,7 +81,6 @@ public class NettyRpcServer implements ServiceRpcServer {
private boolean needTwoWayAuthentic = false;
private String trustStorePath = "";
private String trustStorePassword = "";
- private int maxMessageSize;
/**
* create a server with rpc config info
@@ -115,8 +113,6 @@ public class NettyRpcServer implements ServiceRpcServer {
}
}
this.enableBusyWait = conf.getBoolean(RpcConstants.NETTY_TCP_ENABLEBUSYWAIT, false);
- this.maxMessageSize = conf.getInt(RpcConstants.NETTY_TCP_MAX_MESSAGE_SIZE,
- RpcConstants.CFG_DEFAULT_NETTY_TCP_MAX_MESSAGE_SIZE);
int bossCount =
conf.getInt(RpcConstants.BOSS_COUNT,
RpcConstants.CFG_DEFAULT_BOSS_COUNT);
@@ -172,8 +168,6 @@ public class NettyRpcServer implements ServiceRpcServer {
System.exit(1);
}
}
- socketChannel.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
- maxMessageSize, 0, 4, 0, 4));
// Encode the data handler
socketChannel.pipeline().addLast("protocolEncoder", new NettyProtocolDecoder());
// Decode the bytes into a Rpc Data Pack
diff --git a/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corerpc/netty/NettyProtocolEncoderTest.java b/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corerpc/netty/NettyProtocolEncoderTest.java
index 07b44a819..af61305e2 100644
--- a/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corerpc/netty/NettyProtocolEncoderTest.java
+++ b/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corerpc/netty/NettyProtocolEncoderTest.java
@@ -51,7 +51,6 @@ public class NettyProtocolEncoderTest {
// read data.
int i = buf.readInt();
i = buf.readInt();
- i = buf.readInt();
Assert.assertEquals(123, i);
} catch (Exception e) {
e.printStackTrace();