You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/07/31 03:01:02 UTC

[inlong] 01/01: [INLONG-5097][TubeMQ] Keep the protocol compatible with previous versions (#5214)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit ce3bf56281667004421732c8df426b20721a1ef5
Author: baomingyu <ba...@163.com>
AuthorDate: Sat Jul 30 21:32:53 2022 -0500

    [INLONG-5097][TubeMQ] Keep the protocol compatible with previous versions (#5214)
---
 .../apache/inlong/tubemq/corerpc/RpcConstants.java |  3 -
 .../inlong/tubemq/corerpc/netty/NettyClient.java   |  2 -
 .../tubemq/corerpc/netty/NettyClientFactory.java   | 28 +------
 .../tubemq/corerpc/netty/NettyProtocolDecoder.java | 95 +++++++++++++++-------
 .../tubemq/corerpc/netty/NettyProtocolEncoder.java | 15 ++--
 .../tubemq/corerpc/netty/NettyRpcServer.java       |  6 --
 .../corerpc/netty/NettyProtocolEncoderTest.java    |  1 -
 7 files changed, 80 insertions(+), 70 deletions(-)

diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/RpcConstants.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/RpcConstants.java
index 393863a0d..4bdbd4f36 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/RpcConstants.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/RpcConstants.java
@@ -43,7 +43,6 @@ public final class RpcConstants {
     public static final String NETTY_WRITE_HIGH_MARK = "rpc.netty.write.highmark";
     public static final String NETTY_WRITE_LOW_MARK = "rpc.netty.write.lowmark";
     public static final String NETTY_TCP_SENDBUF = "rpc.netty.send.buffer";
-    public static final String NETTY_TCP_MAX_MESSAGE_SIZE = "rpc.netty.max.message.size";
     public static final String NETTY_TCP_RECEIVEBUF = "rpc.netty.receive.buffer";
     public static final String NETTY_TCP_ENABLEBUSYWAIT = "rpc.netty.enable.busy.wait";
 
@@ -133,6 +132,4 @@ public final class RpcConstants {
     public static final long CFG_UNAVAILABLE_FORBIDDEN_DURATION_MS = 50000;
     public static final long CFG_DEFAULT_NETTY_WRITEBUFFER_HIGH_MARK = 50 * 1024 * 1024;
     public static final long CFG_DEFAULT_NETTY_WRITEBUFFER_LOW_MARK = 5 * 1024 * 1024;
-    public static final int CFG_DEFAULT_NETTY_TCP_MAX_MESSAGE_SIZE = 5 * 1024 * 1024;
-
 }
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyClient.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyClient.java
index 0faff9a05..30f330e75 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyClient.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyClient.java
@@ -285,9 +285,7 @@ public class NettyClient implements Client {
          */
         @Override
         public void channelRead(ChannelHandlerContext ctx, Object e) {
-            logger.debug("client message receive!");
             if (e instanceof RpcDataPack) {
-                logger.debug("RpcDataPack client message receive!");
                 RpcDataPack dataPack = (RpcDataPack) e;
                 Callback callback = requests.remove(dataPack.getSerialNo());
                 if (callback != null) {
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyClientFactory.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyClientFactory.java
index eaa72721b..291a97137 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyClientFactory.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyClientFactory.java
@@ -25,15 +25,12 @@ import io.netty.channel.ChannelInitializer;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.WriteBufferWaterMark;
 import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.handler.timeout.ReadTimeoutHandler;
 import io.netty.util.concurrent.DefaultThreadFactory;
 
 import java.net.InetSocketAddress;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -61,11 +58,8 @@ public class NettyClientFactory implements ClientFactory {
             new ConcurrentHashMap<>();
     protected AtomicBoolean shutdown = new AtomicBoolean(true);
     private EventLoopGroup eventLoopGroup;
-    private ExecutorService bossExecutorService;
-    private ExecutorService workerExecutorService;
     private AtomicInteger workerIdCounter = new AtomicInteger(0);
     // TSL encryption and need Two Way Authentic
-    private int maxMessageSize;
     private boolean enableTLS = false;
     private boolean needTwoWayAuthentic = false;
     private String keyStorePath;
@@ -87,8 +81,6 @@ public class NettyClientFactory implements ClientFactory {
         if (this.shutdown.compareAndSet(true, false)) {
             enableTLS = conf.getBoolean(RpcConstants.TLS_OVER_TCP, false);
             needTwoWayAuthentic = conf.getBoolean(RpcConstants.TLS_TWO_WAY_AUTHENTIC, false);
-            this.maxMessageSize = conf.getInt(RpcConstants.NETTY_TCP_MAX_MESSAGE_SIZE,
-                    RpcConstants.CFG_DEFAULT_NETTY_TCP_MAX_MESSAGE_SIZE);
             if (enableTLS) {
                 trustStorePath = conf.getString(RpcConstants.TLS_TRUSTSTORE_PATH);
                 trustStorePassword = conf.getString(RpcConstants.TLS_TRUSTSTORE_PASSWORD);
@@ -105,16 +97,9 @@ public class NettyClientFactory implements ClientFactory {
                 trustStorePath = null;
                 trustStorePassword = null;
             }
-            final int bossCount =
-                    conf.getInt(RpcConstants.BOSS_COUNT,
-                            RpcConstants.CFG_DEFAULT_BOSS_COUNT);
             final int workerCount =
                     conf.getInt(RpcConstants.WORKER_COUNT,
                             RpcConstants.CFG_DEFAULT_CLIENT_WORKER_COUNT);
-            final int callbackCount =
-                    conf.getInt(RpcConstants.CALLBACK_WORKER_COUNT, 3);
-            bossExecutorService = Executors.newCachedThreadPool();
-            workerExecutorService = Executors.newCachedThreadPool();
             String threadName = new StringBuilder(256)
                     .append(conf.getString(RpcConstants.WORKER_THREAD_NAME,
                             RpcConstants.CFG_DEFAULT_WORKER_THREAD_NAME))
@@ -202,11 +187,8 @@ public class NettyClientFactory implements ClientFactory {
                         }
                     }
                 }
-                if (this.bossExecutorService != null) {
-                    this.bossExecutorService.shutdown();
-                }
-                if (this.workerExecutorService != null) {
-                    this.workerExecutorService.shutdown();
+                if (this.eventLoopGroup != null && !eventLoopGroup.isShutdown()) {
+                    this.eventLoopGroup.shutdownGracefully();
                 }
             } catch (Exception e) {
                 logger.error("has exception ", e);
@@ -248,7 +230,8 @@ public class NettyClientFactory implements ClientFactory {
                     try {
                         SSLEngine sslEngine =
                                 TSSLEngineUtil.createSSLEngine(keyStorePath, trustStorePath,
-                                        keyStorePassword, trustStorePassword, true, needTwoWayAuthentic);
+                                        keyStorePassword, trustStorePassword, true,
+                                        needTwoWayAuthentic);
                         pipeline.addLast("ssl", new SslHandler(sslEngine));
                     } catch (Throwable t) {
                         logger.error(new StringBuilder(256)
@@ -257,9 +240,6 @@ public class NettyClientFactory implements ClientFactory {
                         throw new Exception(t);
                     }
                 }
-                socketChannel.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(maxMessageSize,
-                        0, 4, 0, 4));
-
                 // Encode the data
                 pipeline.addLast("protocolEncoder", new NettyProtocolEncoder());
                 // Decode the bytes into a Rpc Data Pack
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyProtocolDecoder.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyProtocolDecoder.java
index 238677076..9b88870c2 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyProtocolDecoder.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyProtocolDecoder.java
@@ -20,9 +20,11 @@ package org.apache.inlong.tubemq.corerpc.netty;
 import static org.apache.inlong.tubemq.corebase.utils.AddressUtils.getRemoteAddressIP;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.MessageToMessageDecoder;
+import io.netty.util.ReferenceCountUtil;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
@@ -43,50 +45,83 @@ public class NettyProtocolDecoder extends MessageToMessageDecoder<ByteBuf> {
             new ConcurrentHashMap<>();
     private static AtomicLong lastProtolTime = new AtomicLong(0);
     private static AtomicLong lastSizeTime = new AtomicLong(0);
+    private boolean packHeaderRead = false;
+    private int listSize;
+    private List<RpcDataPack> rpcDataPackList = new ArrayList<>();
+    private RpcDataPack dataPack;
+    private ByteBuf lastByteBuf;
 
     @Override
     protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
-        if (buffer.readableBytes() < 12) {
-            logger.warn("Decode buffer.readableBytes() < 12 !");
-            return;
-        }
-        int frameToken = buffer.readInt();
-        filterIllegalPkgToken(frameToken,
-                RpcConstants.RPC_PROTOCOL_BEGIN_TOKEN, ctx.channel());
-        int serialNo = buffer.readInt();
-        int tmpListSize = buffer.readInt();
-        filterIllegalPackageSize(true, tmpListSize,
-                RpcConstants.MAX_FRAME_MAX_LIST_SIZE, ctx.channel());
-        RpcDataPack dataPack = new RpcDataPack(serialNo, new ArrayList<ByteBuffer>());
-        // get PackBody
-        int i = 0;
-        while (i < tmpListSize) {
-            i++;
+        buffer = convertToNewBuf(buffer);
+        while (buffer.readableBytes() > 0) {
+            if (!packHeaderRead) {
+                if (buffer.readableBytes() < 12) {
+                    saveRemainedByteBuf(buffer);
+                    break;
+                }
+                int frameToken = buffer.readInt();
+                filterIllegalPkgToken(frameToken, RpcConstants.RPC_PROTOCOL_BEGIN_TOKEN, ctx.channel());
+                int serialNo = buffer.readInt();
+                int tmpListSize = buffer.readInt();
+                filterIllegalPackageSize(true, tmpListSize,
+                        RpcConstants.MAX_FRAME_MAX_LIST_SIZE, ctx.channel());
+                this.listSize = tmpListSize;
+                this.dataPack = new RpcDataPack(serialNo, new ArrayList<>(this.listSize));
+                this.packHeaderRead = true;
+            }
+            // get PackBody
             if (buffer.readableBytes() < 4) {
-                logger.warn("Decode buffer.readableBytes() < 4 !");
+                saveRemainedByteBuf(buffer);
                 break;
             }
             buffer.markReaderIndex();
             int length = buffer.readInt();
-            filterIllegalPackageSize(false, length,
-                    RpcConstants.RPC_MAX_BUFFER_SIZE, ctx.channel());
+            if (buffer.readableBytes() < length) {
+                buffer.resetReaderIndex();
+                saveRemainedByteBuf(buffer);
+                break;
+            }
             ByteBuffer bb = ByteBuffer.allocate(length);
             buffer.readBytes(bb);
             bb.flip();
             dataPack.getDataLst().add(bb);
+            if (dataPack.getDataLst().size() == listSize) {
+                packHeaderRead = false;
+                rpcDataPackList.add(dataPack);
+            }
         }
+        if (rpcDataPackList.size() > 0) {
+            out.addAll(rpcDataPackList);
+            rpcDataPackList.clear();
+        }
+    }
+
+    private void saveRemainedByteBuf(ByteBuf byteBuf) {
+        if (byteBuf != null && byteBuf.readableBytes() > 0) {
+            lastByteBuf = Unpooled.copiedBuffer(byteBuf);
+        }
+    }
 
-        if (dataPack.getDataLst().size() == tmpListSize) {
-            out.add(dataPack);
-        } else {
-            logger.warn("Decode dataPack.getDataLst().size()[{}] != tmpListSize [{}] !",
-                    dataPack.getDataLst().size(), tmpListSize);
-            return;
+    private ByteBuf convertToNewBuf(ByteBuf byteBuf) {
+        ByteBuf newByteBuf = byteBuf;
+        int totalReadBytes = byteBuf.readableBytes();
+        if (lastByteBuf != null) {
+            try {
+                totalReadBytes += lastByteBuf.readableBytes();
+                newByteBuf = Unpooled.buffer(totalReadBytes);
+                newByteBuf.writeBytes(lastByteBuf);
+                newByteBuf.writeBytes(byteBuf);
+            } finally {
+                ReferenceCountUtil.release(lastByteBuf);
+            }
+            lastByteBuf = null;
         }
+        return newByteBuf;
     }
 
-    private void filterIllegalPkgToken(int inParamValue,
-                                       int allowTokenVal, Channel channel) throws UnknownProtocolException {
+    private void filterIllegalPkgToken(int inParamValue, int allowTokenVal,
+            Channel channel) throws UnknownProtocolException {
         if (inParamValue != allowTokenVal) {
             String rmtaddrIp = getRemoteAddressIP(channel);
             if (rmtaddrIp != null) {
@@ -103,7 +138,11 @@ public class NettyProtocolDecoder extends MessageToMessageDecoder<ByteBuf> {
                 long curTime = System.currentTimeMillis();
                 if (curTime - befTime > 180000) {
                     if (lastProtolTime.compareAndSet(befTime, System.currentTimeMillis())) {
-                        logger.warn("[Abnormal Visit] OSS Tube visit list is :" + errProtolAddrMap.toString());
+                        logger.warn("[Abnormal Visit] OSS Tube  [inParamValue = {} vs "
+                                        + "allowTokenVal = {}] visit "
+                                        + "list is : {}",
+                                inParamValue, allowTokenVal,
+                                errProtolAddrMap.toString());
                         errProtolAddrMap.clear();
                     }
                 }
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyProtocolEncoder.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyProtocolEncoder.java
index fde7b523a..3f52d35bf 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyProtocolEncoder.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyProtocolEncoder.java
@@ -24,6 +24,7 @@ import io.netty.handler.codec.MessageToMessageEncoder;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import org.apache.inlong.tubemq.corerpc.RpcConstants;
@@ -38,19 +39,17 @@ public class NettyProtocolEncoder extends MessageToMessageEncoder<RpcDataPack> {
     @Override
     protected void encode(ChannelHandlerContext chx, RpcDataPack msg, List<Object> out) {
         RpcDataPack dataPack = msg;
-        List<ByteBuffer> origs = dataPack.getDataLst();
-        ByteArrayOutputStream byteOut = new ByteArrayOutputStream();
-        try {
+        try (ByteArrayOutputStream byteOut = new ByteArrayOutputStream()) {
             byteOut.write(getPackHeader(dataPack).array());
+            List<ByteBuffer> origs = dataPack.getDataLst();
             Iterator<ByteBuffer> iter = origs.iterator();
             while (iter.hasNext()) {
                 ByteBuffer entry = iter.next();
                 byteOut.write(getLengthHeader(entry).array());
-                byteOut.write(entry.array());
+                byteOut.write(getLengthBody(entry));
             }
             byte[] body = byteOut.toByteArray();
-            ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(4 + body.length);
-            buf.writeInt(body.length);
+            ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(body.length);
             buf.writeBytes(body);
             out.add(buf);
         } catch (IOException e) {
@@ -73,4 +72,8 @@ public class NettyProtocolEncoder extends MessageToMessageEncoder<RpcDataPack> {
         header.flip();
         return header;
     }
+
+    private byte[] getLengthBody(ByteBuffer buf) {
+        return Arrays.copyOf(buf.array(), buf.limit());
+    }
 }
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyRpcServer.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyRpcServer.java
index f21e3ea62..5f19d22e8 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyRpcServer.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyRpcServer.java
@@ -28,7 +28,6 @@ import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.WriteBufferWaterMark;
 import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import java.io.DataOutputStream;
@@ -82,7 +81,6 @@ public class NettyRpcServer implements ServiceRpcServer {
     private boolean needTwoWayAuthentic = false;
     private String trustStorePath = "";
     private String trustStorePassword = "";
-    private int maxMessageSize;
 
     /**
      * create a server with rpc config info
@@ -115,8 +113,6 @@ public class NettyRpcServer implements ServiceRpcServer {
             }
         }
         this.enableBusyWait = conf.getBoolean(RpcConstants.NETTY_TCP_ENABLEBUSYWAIT, false);
-        this.maxMessageSize = conf.getInt(RpcConstants.NETTY_TCP_MAX_MESSAGE_SIZE,
-                RpcConstants.CFG_DEFAULT_NETTY_TCP_MAX_MESSAGE_SIZE);
         int bossCount =
                 conf.getInt(RpcConstants.BOSS_COUNT,
                         RpcConstants.CFG_DEFAULT_BOSS_COUNT);
@@ -172,8 +168,6 @@ public class NettyRpcServer implements ServiceRpcServer {
                         System.exit(1);
                     }
                 }
-                socketChannel.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
-                        maxMessageSize, 0, 4, 0, 4));
                 // Encode the data handler
                 socketChannel.pipeline().addLast("protocolEncoder", new NettyProtocolDecoder());
                 // Decode the bytes into a Rpc Data Pack
diff --git a/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corerpc/netty/NettyProtocolEncoderTest.java b/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corerpc/netty/NettyProtocolEncoderTest.java
index 07b44a819..af61305e2 100644
--- a/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corerpc/netty/NettyProtocolEncoderTest.java
+++ b/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corerpc/netty/NettyProtocolEncoderTest.java
@@ -51,7 +51,6 @@ public class NettyProtocolEncoderTest {
             // read data.
             int i = buf.readInt();
             i = buf.readInt();
-            i = buf.readInt();
             Assert.assertEquals(123, i);
         } catch (Exception e) {
             e.printStackTrace();