You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/10/15 12:04:09 UTC

[incubator-eventmesh] branch protocol-amqp updated: Modify the implementation of maxFrame size

This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch protocol-amqp
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/protocol-amqp by this push:
     new 150fefbc Modify the implementation of maxFrame size
     new f7177a66 Merge pull request #1608 from wangshaojie4039/protocol-amqp
150fefbc is described below

commit 150fefbca0f7a98788adb21e8e58498500470d1e
Author: wangshaojie <wa...@cmss.chinamobile.com>
AuthorDate: Sat Oct 15 20:02:04 2022 +0800

    Modify the implementation of maxFrame size
---
 .../runtime/boot/EventMeshAmqpServer.java          | 25 +++++++++++++++++++---
 .../core/protocol/amqp/remoting/AMQPFrame.java     | 20 ++++++++++-------
 .../amqp/remoting/codec/AmqpCodeDecoder.java       | 20 +++++++++--------
 3 files changed, 45 insertions(+), 20 deletions(-)

diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshAmqpServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshAmqpServer.java
index a94aeace..9a82d325 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshAmqpServer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshAmqpServer.java
@@ -23,11 +23,14 @@ import io.netty.channel.*;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import org.apache.eventmesh.runtime.configuration.EventMeshAmqpConfiguration;
-import org.apache.eventmesh.runtime.constants.EventMeshConstants;
+import org.apache.eventmesh.runtime.core.protocol.amqp.metadata.MetaStore;
 import org.apache.eventmesh.runtime.core.protocol.amqp.remoting.codec.AmqpCodeDecoder;
 import org.apache.eventmesh.runtime.core.protocol.amqp.remoting.codec.AmqpCodeEncoder;
+import org.apache.eventmesh.runtime.core.protocol.amqp.service.ExchangeService;
+import org.apache.eventmesh.runtime.core.protocol.amqp.service.ExchangeServiceImpl;
+import org.apache.eventmesh.runtime.core.protocol.amqp.service.QueueService;
+import org.apache.eventmesh.runtime.core.protocol.amqp.service.QueueServiceImpl;
 import org.apache.eventmesh.runtime.registry.Registry;
-import org.apache.eventmesh.webhook.admin.AdminWebHookConfigOperationManage;
 
 public class EventMeshAmqpServer extends AbstractRemotingServer {
 
@@ -37,12 +40,21 @@ public class EventMeshAmqpServer extends AbstractRemotingServer {
 
     private Registry registry;
 
+    private ExchangeService exchangeService;
+
+    private QueueService queueService;
+
+    private MetaStore metaStore;
+
     public EventMeshAmqpServer(EventMeshServer eventMeshServer,
                                EventMeshAmqpConfiguration eventMeshAmqpConfiguration, Registry registry) {
         super();
         this.eventMeshServer = eventMeshServer;
         this.eventMeshAmqpConfiguration = eventMeshAmqpConfiguration;
         this.registry = registry;
+        this.metaStore=new MetaStore(this);
+        this.exchangeService=new ExchangeServiceImpl(this,metaStore);
+        this.queueService=new QueueServiceImpl(this,metaStore);
     }
 
 
@@ -111,6 +123,13 @@ public class EventMeshAmqpServer extends AbstractRemotingServer {
         return eventMeshAmqpConfiguration;
     }
 
+    public ExchangeService getExchangeService() {
+        return exchangeService;
+    }
+
+    public QueueService getQueueService() {
+        return queueService;
+    }
 
     /**
      * A channel initializer that initialize channels for amqp protocol.
@@ -138,7 +157,7 @@ public class EventMeshAmqpServer extends AbstractRemotingServer {
                     new AmqpCodeEncoder());
             ch.pipeline().addLast("frameDecoder",
                     // TODO
-                    new AmqpCodeDecoder(eventMeshAmqpConfiguration.maxFrameSize));
+                    new AmqpCodeDecoder());
             // TODO: 2022/9/27 add AmqpConnection
 //            ch.pipeline().addLast("handler",
 //                    new AmqpConnection(amqpServer));
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/remoting/AMQPFrame.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/remoting/AMQPFrame.java
index 12721ce5..70f053a7 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/remoting/AMQPFrame.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/remoting/AMQPFrame.java
@@ -18,17 +18,15 @@
 package org.apache.eventmesh.runtime.core.protocol.amqp.remoting;
 
 
-import org.apache.eventmesh.runtime.core.protocol.amqp.exception.MalformedFrameException;
-
-import java.io.DataInputStream;
-import java.io.IOException;
-
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.impl.Frame;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufInputStream;
 import io.netty.buffer.Unpooled;
+import org.apache.eventmesh.runtime.core.protocol.amqp.exception.MalformedFrameException;
 
-import com.rabbitmq.client.AMQP;
-import com.rabbitmq.client.impl.Frame;
+import java.io.DataInputStream;
+import java.io.IOException;
 
 public class AMQPFrame implements AMQData {
 
@@ -44,7 +42,7 @@ public class AMQPFrame implements AMQData {
      */
     private final int channel;
 
-    private final ByteBuf payload;
+    private ByteBuf payload;
 
 
     public AMQPFrame(int type, int channel, ByteBuf payload) {
@@ -53,6 +51,12 @@ public class AMQPFrame implements AMQData {
         this.payload = payload;
     }
 
+    public AMQPFrame(int type, int channel) {
+        this.type = type;
+        this.channel = channel;
+    }
+
+
     public static AMQPFrame get(ByteBuf buf) throws IOException {
         int type = buf.readUnsignedByte();
         int channel = buf.readUnsignedShort();
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/remoting/codec/AmqpCodeDecoder.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/remoting/codec/AmqpCodeDecoder.java
index bfe24814..2508ded5 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/remoting/codec/AmqpCodeDecoder.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/remoting/codec/AmqpCodeDecoder.java
@@ -17,6 +17,7 @@
 
 package org.apache.eventmesh.runtime.core.protocol.amqp.remoting.codec;
 
+import io.netty.util.AttributeKey;
 import org.apache.eventmesh.runtime.core.protocol.amqp.exception.MalformedFrameException;
 import org.apache.eventmesh.runtime.core.protocol.amqp.remoting.AMQPFrame;
 import org.apache.eventmesh.runtime.core.protocol.amqp.remoting.protocol.ProtocolFrame;
@@ -37,6 +38,10 @@ public class AmqpCodeDecoder extends AbstractBatchDecoder {
 
     private final Logger logger = LoggerFactory.getLogger(this.getClass());
 
+    public static final AttributeKey<Integer> ATTRIBUTE_KEY_MAX_FRAME_SIZE = AttributeKey.valueOf("maxFrameSize");
+
+
+
     /**
      * the length of protocol code
      */
@@ -47,12 +52,6 @@ public class AmqpCodeDecoder extends AbstractBatchDecoder {
      */
     private static final ProtocolVersion DEFAULT_PROTOCOL_VERSION = ProtocolVersion.v0_91;
 
-    protected int maxFrameSize;
-
-    public AmqpCodeDecoder(int maxFrameSize) {
-        super();
-        this.maxFrameSize = maxFrameSize;
-    }
 
     private boolean firstRead = true;
 
@@ -116,9 +115,12 @@ public class AmqpCodeDecoder extends AbstractBatchDecoder {
                 in.readUnsignedShort();
                 int payloadSize = in.readInt();
                 in.resetReaderIndex();
-                if (payloadSize > maxFrameSize) {
-                    throw new MalformedFrameException(
-                        "frame > maxFrameSize exception remoteAddress:" + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+                if (ctx.channel().attr(ATTRIBUTE_KEY_MAX_FRAME_SIZE).get() != null) {
+                    int maxFrameSize = ctx.channel().attr(ATTRIBUTE_KEY_MAX_FRAME_SIZE).get();
+                    if (payloadSize > maxFrameSize) {
+                        throw new MalformedFrameException(
+                                "frame > maxFrameSize exception remoteAddress:" + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+                    }
                 }
 
                 if (in.readableBytes() >= (AMQPFrame.NON_BODY_SIZE + payloadSize)) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org