You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/07/31 03:01:01 UTC

[inlong] branch master updated (876f247ed -> ce3bf5628)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


    omit 876f247ed [INLONG-5097][TubeMQ]Fix the protocol from 1.1.0 and later is not smoothly compatible with previous versions (#5214)
     new ce3bf5628 [INLONG-5097][TubeMQ] Keep the protocol compatible with previous versions (#5214)

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (876f247ed)
            \
             N -- N -- N   refs/heads/master (ce3bf5628)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:


[inlong] 01/01: [INLONG-5097][TubeMQ] Keep the protocol compatible with previous versions (#5214)

Posted by he...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit ce3bf56281667004421732c8df426b20721a1ef5
Author: baomingyu <ba...@163.com>
AuthorDate: Sat Jul 30 21:32:53 2022 -0500

    [INLONG-5097][TubeMQ] Keep the protocol compatible with previous versions (#5214)
---
 .../apache/inlong/tubemq/corerpc/RpcConstants.java |  3 -
 .../inlong/tubemq/corerpc/netty/NettyClient.java   |  2 -
 .../tubemq/corerpc/netty/NettyClientFactory.java   | 28 +------
 .../tubemq/corerpc/netty/NettyProtocolDecoder.java | 95 +++++++++++++++-------
 .../tubemq/corerpc/netty/NettyProtocolEncoder.java | 15 ++--
 .../tubemq/corerpc/netty/NettyRpcServer.java       |  6 --
 .../corerpc/netty/NettyProtocolEncoderTest.java    |  1 -
 7 files changed, 80 insertions(+), 70 deletions(-)

diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/RpcConstants.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/RpcConstants.java
index 393863a0d..4bdbd4f36 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/RpcConstants.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/RpcConstants.java
@@ -43,7 +43,6 @@ public final class RpcConstants {
     public static final String NETTY_WRITE_HIGH_MARK = "rpc.netty.write.highmark";
     public static final String NETTY_WRITE_LOW_MARK = "rpc.netty.write.lowmark";
     public static final String NETTY_TCP_SENDBUF = "rpc.netty.send.buffer";
-    public static final String NETTY_TCP_MAX_MESSAGE_SIZE = "rpc.netty.max.message.size";
     public static final String NETTY_TCP_RECEIVEBUF = "rpc.netty.receive.buffer";
     public static final String NETTY_TCP_ENABLEBUSYWAIT = "rpc.netty.enable.busy.wait";
 
@@ -133,6 +132,4 @@ public final class RpcConstants {
     public static final long CFG_UNAVAILABLE_FORBIDDEN_DURATION_MS = 50000;
     public static final long CFG_DEFAULT_NETTY_WRITEBUFFER_HIGH_MARK = 50 * 1024 * 1024;
     public static final long CFG_DEFAULT_NETTY_WRITEBUFFER_LOW_MARK = 5 * 1024 * 1024;
-    public static final int CFG_DEFAULT_NETTY_TCP_MAX_MESSAGE_SIZE = 5 * 1024 * 1024;
-
 }
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyClient.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyClient.java
index 0faff9a05..30f330e75 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyClient.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyClient.java
@@ -285,9 +285,7 @@ public class NettyClient implements Client {
          */
         @Override
         public void channelRead(ChannelHandlerContext ctx, Object e) {
-            logger.debug("client message receive!");
             if (e instanceof RpcDataPack) {
-                logger.debug("RpcDataPack client message receive!");
                 RpcDataPack dataPack = (RpcDataPack) e;
                 Callback callback = requests.remove(dataPack.getSerialNo());
                 if (callback != null) {
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyClientFactory.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyClientFactory.java
index eaa72721b..291a97137 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyClientFactory.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyClientFactory.java
@@ -25,15 +25,12 @@ import io.netty.channel.ChannelInitializer;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.WriteBufferWaterMark;
 import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.handler.timeout.ReadTimeoutHandler;
 import io.netty.util.concurrent.DefaultThreadFactory;
 
 import java.net.InetSocketAddress;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -61,11 +58,8 @@ public class NettyClientFactory implements ClientFactory {
             new ConcurrentHashMap<>();
     protected AtomicBoolean shutdown = new AtomicBoolean(true);
     private EventLoopGroup eventLoopGroup;
-    private ExecutorService bossExecutorService;
-    private ExecutorService workerExecutorService;
     private AtomicInteger workerIdCounter = new AtomicInteger(0);
     // TSL encryption and need Two Way Authentic
-    private int maxMessageSize;
     private boolean enableTLS = false;
     private boolean needTwoWayAuthentic = false;
     private String keyStorePath;
@@ -87,8 +81,6 @@ public class NettyClientFactory implements ClientFactory {
         if (this.shutdown.compareAndSet(true, false)) {
             enableTLS = conf.getBoolean(RpcConstants.TLS_OVER_TCP, false);
             needTwoWayAuthentic = conf.getBoolean(RpcConstants.TLS_TWO_WAY_AUTHENTIC, false);
-            this.maxMessageSize = conf.getInt(RpcConstants.NETTY_TCP_MAX_MESSAGE_SIZE,
-                    RpcConstants.CFG_DEFAULT_NETTY_TCP_MAX_MESSAGE_SIZE);
             if (enableTLS) {
                 trustStorePath = conf.getString(RpcConstants.TLS_TRUSTSTORE_PATH);
                 trustStorePassword = conf.getString(RpcConstants.TLS_TRUSTSTORE_PASSWORD);
@@ -105,16 +97,9 @@ public class NettyClientFactory implements ClientFactory {
                 trustStorePath = null;
                 trustStorePassword = null;
             }
-            final int bossCount =
-                    conf.getInt(RpcConstants.BOSS_COUNT,
-                            RpcConstants.CFG_DEFAULT_BOSS_COUNT);
             final int workerCount =
                     conf.getInt(RpcConstants.WORKER_COUNT,
                             RpcConstants.CFG_DEFAULT_CLIENT_WORKER_COUNT);
-            final int callbackCount =
-                    conf.getInt(RpcConstants.CALLBACK_WORKER_COUNT, 3);
-            bossExecutorService = Executors.newCachedThreadPool();
-            workerExecutorService = Executors.newCachedThreadPool();
             String threadName = new StringBuilder(256)
                     .append(conf.getString(RpcConstants.WORKER_THREAD_NAME,
                             RpcConstants.CFG_DEFAULT_WORKER_THREAD_NAME))
@@ -202,11 +187,8 @@ public class NettyClientFactory implements ClientFactory {
                         }
                     }
                 }
-                if (this.bossExecutorService != null) {
-                    this.bossExecutorService.shutdown();
-                }
-                if (this.workerExecutorService != null) {
-                    this.workerExecutorService.shutdown();
+                if (this.eventLoopGroup != null && !eventLoopGroup.isShutdown()) {
+                    this.eventLoopGroup.shutdownGracefully();
                 }
             } catch (Exception e) {
                 logger.error("has exception ", e);
@@ -248,7 +230,8 @@ public class NettyClientFactory implements ClientFactory {
                     try {
                         SSLEngine sslEngine =
                                 TSSLEngineUtil.createSSLEngine(keyStorePath, trustStorePath,
-                                        keyStorePassword, trustStorePassword, true, needTwoWayAuthentic);
+                                        keyStorePassword, trustStorePassword, true,
+                                        needTwoWayAuthentic);
                         pipeline.addLast("ssl", new SslHandler(sslEngine));
                     } catch (Throwable t) {
                         logger.error(new StringBuilder(256)
@@ -257,9 +240,6 @@ public class NettyClientFactory implements ClientFactory {
                         throw new Exception(t);
                     }
                 }
-                socketChannel.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(maxMessageSize,
-                        0, 4, 0, 4));
-
                 // Encode the data
                 pipeline.addLast("protocolEncoder", new NettyProtocolEncoder());
                 // Decode the bytes into a Rpc Data Pack
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyProtocolDecoder.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyProtocolDecoder.java
index 238677076..9b88870c2 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyProtocolDecoder.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyProtocolDecoder.java
@@ -20,9 +20,11 @@ package org.apache.inlong.tubemq.corerpc.netty;
 import static org.apache.inlong.tubemq.corebase.utils.AddressUtils.getRemoteAddressIP;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.MessageToMessageDecoder;
+import io.netty.util.ReferenceCountUtil;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
@@ -43,50 +45,83 @@ public class NettyProtocolDecoder extends MessageToMessageDecoder<ByteBuf> {
             new ConcurrentHashMap<>();
     private static AtomicLong lastProtolTime = new AtomicLong(0);
     private static AtomicLong lastSizeTime = new AtomicLong(0);
+    private boolean packHeaderRead = false;
+    private int listSize;
+    private List<RpcDataPack> rpcDataPackList = new ArrayList<>();
+    private RpcDataPack dataPack;
+    private ByteBuf lastByteBuf;
 
     @Override
     protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
-        if (buffer.readableBytes() < 12) {
-            logger.warn("Decode buffer.readableBytes() < 12 !");
-            return;
-        }
-        int frameToken = buffer.readInt();
-        filterIllegalPkgToken(frameToken,
-                RpcConstants.RPC_PROTOCOL_BEGIN_TOKEN, ctx.channel());
-        int serialNo = buffer.readInt();
-        int tmpListSize = buffer.readInt();
-        filterIllegalPackageSize(true, tmpListSize,
-                RpcConstants.MAX_FRAME_MAX_LIST_SIZE, ctx.channel());
-        RpcDataPack dataPack = new RpcDataPack(serialNo, new ArrayList<ByteBuffer>());
-        // get PackBody
-        int i = 0;
-        while (i < tmpListSize) {
-            i++;
+        buffer = convertToNewBuf(buffer);
+        while (buffer.readableBytes() > 0) {
+            if (!packHeaderRead) {
+                if (buffer.readableBytes() < 12) {
+                    saveRemainedByteBuf(buffer);
+                    break;
+                }
+                int frameToken = buffer.readInt();
+                filterIllegalPkgToken(frameToken, RpcConstants.RPC_PROTOCOL_BEGIN_TOKEN, ctx.channel());
+                int serialNo = buffer.readInt();
+                int tmpListSize = buffer.readInt();
+                filterIllegalPackageSize(true, tmpListSize,
+                        RpcConstants.MAX_FRAME_MAX_LIST_SIZE, ctx.channel());
+                this.listSize = tmpListSize;
+                this.dataPack = new RpcDataPack(serialNo, new ArrayList<>(this.listSize));
+                this.packHeaderRead = true;
+            }
+            // get PackBody
             if (buffer.readableBytes() < 4) {
-                logger.warn("Decode buffer.readableBytes() < 4 !");
+                saveRemainedByteBuf(buffer);
                 break;
             }
             buffer.markReaderIndex();
             int length = buffer.readInt();
-            filterIllegalPackageSize(false, length,
-                    RpcConstants.RPC_MAX_BUFFER_SIZE, ctx.channel());
+            if (buffer.readableBytes() < length) {
+                buffer.resetReaderIndex();
+                saveRemainedByteBuf(buffer);
+                break;
+            }
             ByteBuffer bb = ByteBuffer.allocate(length);
             buffer.readBytes(bb);
             bb.flip();
             dataPack.getDataLst().add(bb);
+            if (dataPack.getDataLst().size() == listSize) {
+                packHeaderRead = false;
+                rpcDataPackList.add(dataPack);
+            }
         }
+        if (rpcDataPackList.size() > 0) {
+            out.addAll(rpcDataPackList);
+            rpcDataPackList.clear();
+        }
+    }
+
+    private void saveRemainedByteBuf(ByteBuf byteBuf) {
+        if (byteBuf != null && byteBuf.readableBytes() > 0) {
+            lastByteBuf = Unpooled.copiedBuffer(byteBuf);
+        }
+    }
 
-        if (dataPack.getDataLst().size() == tmpListSize) {
-            out.add(dataPack);
-        } else {
-            logger.warn("Decode dataPack.getDataLst().size()[{}] != tmpListSize [{}] !",
-                    dataPack.getDataLst().size(), tmpListSize);
-            return;
+    private ByteBuf convertToNewBuf(ByteBuf byteBuf) {
+        ByteBuf newByteBuf = byteBuf;
+        int totalReadBytes = byteBuf.readableBytes();
+        if (lastByteBuf != null) {
+            try {
+                totalReadBytes += lastByteBuf.readableBytes();
+                newByteBuf = Unpooled.buffer(totalReadBytes);
+                newByteBuf.writeBytes(lastByteBuf);
+                newByteBuf.writeBytes(byteBuf);
+            } finally {
+                ReferenceCountUtil.release(lastByteBuf);
+            }
+            lastByteBuf = null;
         }
+        return newByteBuf;
     }
 
-    private void filterIllegalPkgToken(int inParamValue,
-                                       int allowTokenVal, Channel channel) throws UnknownProtocolException {
+    private void filterIllegalPkgToken(int inParamValue, int allowTokenVal,
+            Channel channel) throws UnknownProtocolException {
         if (inParamValue != allowTokenVal) {
             String rmtaddrIp = getRemoteAddressIP(channel);
             if (rmtaddrIp != null) {
@@ -103,7 +138,11 @@ public class NettyProtocolDecoder extends MessageToMessageDecoder<ByteBuf> {
                 long curTime = System.currentTimeMillis();
                 if (curTime - befTime > 180000) {
                     if (lastProtolTime.compareAndSet(befTime, System.currentTimeMillis())) {
-                        logger.warn("[Abnormal Visit] OSS Tube visit list is :" + errProtolAddrMap.toString());
+                        logger.warn("[Abnormal Visit] OSS Tube  [inParamValue = {} vs "
+                                        + "allowTokenVal = {}] visit "
+                                        + "list is : {}",
+                                inParamValue, allowTokenVal,
+                                errProtolAddrMap.toString());
                         errProtolAddrMap.clear();
                     }
                 }
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyProtocolEncoder.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyProtocolEncoder.java
index fde7b523a..3f52d35bf 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyProtocolEncoder.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyProtocolEncoder.java
@@ -24,6 +24,7 @@ import io.netty.handler.codec.MessageToMessageEncoder;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import org.apache.inlong.tubemq.corerpc.RpcConstants;
@@ -38,19 +39,17 @@ public class NettyProtocolEncoder extends MessageToMessageEncoder<RpcDataPack> {
     @Override
     protected void encode(ChannelHandlerContext chx, RpcDataPack msg, List<Object> out) {
         RpcDataPack dataPack = msg;
-        List<ByteBuffer> origs = dataPack.getDataLst();
-        ByteArrayOutputStream byteOut = new ByteArrayOutputStream();
-        try {
+        try (ByteArrayOutputStream byteOut = new ByteArrayOutputStream()) {
             byteOut.write(getPackHeader(dataPack).array());
+            List<ByteBuffer> origs = dataPack.getDataLst();
             Iterator<ByteBuffer> iter = origs.iterator();
             while (iter.hasNext()) {
                 ByteBuffer entry = iter.next();
                 byteOut.write(getLengthHeader(entry).array());
-                byteOut.write(entry.array());
+                byteOut.write(getLengthBody(entry));
             }
             byte[] body = byteOut.toByteArray();
-            ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(4 + body.length);
-            buf.writeInt(body.length);
+            ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(body.length);
             buf.writeBytes(body);
             out.add(buf);
         } catch (IOException e) {
@@ -73,4 +72,8 @@ public class NettyProtocolEncoder extends MessageToMessageEncoder<RpcDataPack> {
         header.flip();
         return header;
     }
+
+    private byte[] getLengthBody(ByteBuffer buf) {
+        return Arrays.copyOf(buf.array(), buf.limit());
+    }
 }
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyRpcServer.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyRpcServer.java
index f21e3ea62..5f19d22e8 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyRpcServer.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyRpcServer.java
@@ -28,7 +28,6 @@ import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.WriteBufferWaterMark;
 import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import java.io.DataOutputStream;
@@ -82,7 +81,6 @@ public class NettyRpcServer implements ServiceRpcServer {
     private boolean needTwoWayAuthentic = false;
     private String trustStorePath = "";
     private String trustStorePassword = "";
-    private int maxMessageSize;
 
     /**
      * create a server with rpc config info
@@ -115,8 +113,6 @@ public class NettyRpcServer implements ServiceRpcServer {
             }
         }
         this.enableBusyWait = conf.getBoolean(RpcConstants.NETTY_TCP_ENABLEBUSYWAIT, false);
-        this.maxMessageSize = conf.getInt(RpcConstants.NETTY_TCP_MAX_MESSAGE_SIZE,
-                RpcConstants.CFG_DEFAULT_NETTY_TCP_MAX_MESSAGE_SIZE);
         int bossCount =
                 conf.getInt(RpcConstants.BOSS_COUNT,
                         RpcConstants.CFG_DEFAULT_BOSS_COUNT);
@@ -172,8 +168,6 @@ public class NettyRpcServer implements ServiceRpcServer {
                         System.exit(1);
                     }
                 }
-                socketChannel.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
-                        maxMessageSize, 0, 4, 0, 4));
                 // Encode the data handler
                 socketChannel.pipeline().addLast("protocolEncoder", new NettyProtocolDecoder());
                 // Decode the bytes into a Rpc Data Pack
diff --git a/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corerpc/netty/NettyProtocolEncoderTest.java b/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corerpc/netty/NettyProtocolEncoderTest.java
index 07b44a819..af61305e2 100644
--- a/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corerpc/netty/NettyProtocolEncoderTest.java
+++ b/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corerpc/netty/NettyProtocolEncoderTest.java
@@ -51,7 +51,6 @@ public class NettyProtocolEncoderTest {
             // read data.
             int i = buf.readInt();
             i = buf.readInt();
-            i = buf.readInt();
             Assert.assertEquals(123, i);
         } catch (Exception e) {
             e.printStackTrace();