You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/10/15 12:04:09 UTC
[incubator-eventmesh] branch protocol-amqp updated: Modify the implementation of maxFrame size
This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch protocol-amqp
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/protocol-amqp by this push:
new 150fefbc Modify the implementation of maxFrame size
new f7177a66 Merge pull request #1608 from wangshaojie4039/protocol-amqp
150fefbc is described below
commit 150fefbca0f7a98788adb21e8e58498500470d1e
Author: wangshaojie <wa...@cmss.chinamobile.com>
AuthorDate: Sat Oct 15 20:02:04 2022 +0800
Modify the implementation of maxFrame size
---
.../runtime/boot/EventMeshAmqpServer.java | 25 +++++++++++++++++++---
.../core/protocol/amqp/remoting/AMQPFrame.java | 20 ++++++++++-------
.../amqp/remoting/codec/AmqpCodeDecoder.java | 20 +++++++++--------
3 files changed, 45 insertions(+), 20 deletions(-)
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshAmqpServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshAmqpServer.java
index a94aeace..9a82d325 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshAmqpServer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshAmqpServer.java
@@ -23,11 +23,14 @@ import io.netty.channel.*;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.apache.eventmesh.runtime.configuration.EventMeshAmqpConfiguration;
-import org.apache.eventmesh.runtime.constants.EventMeshConstants;
+import org.apache.eventmesh.runtime.core.protocol.amqp.metadata.MetaStore;
import org.apache.eventmesh.runtime.core.protocol.amqp.remoting.codec.AmqpCodeDecoder;
import org.apache.eventmesh.runtime.core.protocol.amqp.remoting.codec.AmqpCodeEncoder;
+import org.apache.eventmesh.runtime.core.protocol.amqp.service.ExchangeService;
+import org.apache.eventmesh.runtime.core.protocol.amqp.service.ExchangeServiceImpl;
+import org.apache.eventmesh.runtime.core.protocol.amqp.service.QueueService;
+import org.apache.eventmesh.runtime.core.protocol.amqp.service.QueueServiceImpl;
import org.apache.eventmesh.runtime.registry.Registry;
-import org.apache.eventmesh.webhook.admin.AdminWebHookConfigOperationManage;
public class EventMeshAmqpServer extends AbstractRemotingServer {
@@ -37,12 +40,21 @@ public class EventMeshAmqpServer extends AbstractRemotingServer {
private Registry registry;
+ private ExchangeService exchangeService;
+
+ private QueueService queueService;
+
+ private MetaStore metaStore;
+
public EventMeshAmqpServer(EventMeshServer eventMeshServer,
EventMeshAmqpConfiguration eventMeshAmqpConfiguration, Registry registry) {
super();
this.eventMeshServer = eventMeshServer;
this.eventMeshAmqpConfiguration = eventMeshAmqpConfiguration;
this.registry = registry;
+ this.metaStore=new MetaStore(this);
+ this.exchangeService=new ExchangeServiceImpl(this,metaStore);
+ this.queueService=new QueueServiceImpl(this,metaStore);
}
@@ -111,6 +123,13 @@ public class EventMeshAmqpServer extends AbstractRemotingServer {
return eventMeshAmqpConfiguration;
}
+ public ExchangeService getExchangeService() {
+ return exchangeService;
+ }
+
+ public QueueService getQueueService() {
+ return queueService;
+ }
/**
* A channel initializer that initialize channels for amqp protocol.
@@ -138,7 +157,7 @@ public class EventMeshAmqpServer extends AbstractRemotingServer {
new AmqpCodeEncoder());
ch.pipeline().addLast("frameDecoder",
// TODO
- new AmqpCodeDecoder(eventMeshAmqpConfiguration.maxFrameSize));
+ new AmqpCodeDecoder());
// TODO: 2022/9/27 add AmqpConnection
// ch.pipeline().addLast("handler",
// new AmqpConnection(amqpServer));
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/remoting/AMQPFrame.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/remoting/AMQPFrame.java
index 12721ce5..70f053a7 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/remoting/AMQPFrame.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/remoting/AMQPFrame.java
@@ -18,17 +18,15 @@
package org.apache.eventmesh.runtime.core.protocol.amqp.remoting;
-import org.apache.eventmesh.runtime.core.protocol.amqp.exception.MalformedFrameException;
-
-import java.io.DataInputStream;
-import java.io.IOException;
-
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.impl.Frame;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.Unpooled;
+import org.apache.eventmesh.runtime.core.protocol.amqp.exception.MalformedFrameException;
-import com.rabbitmq.client.AMQP;
-import com.rabbitmq.client.impl.Frame;
+import java.io.DataInputStream;
+import java.io.IOException;
public class AMQPFrame implements AMQData {
@@ -44,7 +42,7 @@ public class AMQPFrame implements AMQData {
*/
private final int channel;
- private final ByteBuf payload;
+ private ByteBuf payload;
public AMQPFrame(int type, int channel, ByteBuf payload) {
@@ -53,6 +51,12 @@ public class AMQPFrame implements AMQData {
this.payload = payload;
}
+ public AMQPFrame(int type, int channel) {
+ this.type = type;
+ this.channel = channel;
+ }
+
+
public static AMQPFrame get(ByteBuf buf) throws IOException {
int type = buf.readUnsignedByte();
int channel = buf.readUnsignedShort();
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/remoting/codec/AmqpCodeDecoder.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/remoting/codec/AmqpCodeDecoder.java
index bfe24814..2508ded5 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/remoting/codec/AmqpCodeDecoder.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/remoting/codec/AmqpCodeDecoder.java
@@ -17,6 +17,7 @@
package org.apache.eventmesh.runtime.core.protocol.amqp.remoting.codec;
+import io.netty.util.AttributeKey;
import org.apache.eventmesh.runtime.core.protocol.amqp.exception.MalformedFrameException;
import org.apache.eventmesh.runtime.core.protocol.amqp.remoting.AMQPFrame;
import org.apache.eventmesh.runtime.core.protocol.amqp.remoting.protocol.ProtocolFrame;
@@ -37,6 +38,10 @@ public class AmqpCodeDecoder extends AbstractBatchDecoder {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
+ public static final AttributeKey<Integer> ATTRIBUTE_KEY_MAX_FRAME_SIZE = AttributeKey.valueOf("maxFrameSize");
+
+
+
/**
* the length of protocol code
*/
@@ -47,12 +52,6 @@ public class AmqpCodeDecoder extends AbstractBatchDecoder {
*/
private static final ProtocolVersion DEFAULT_PROTOCOL_VERSION = ProtocolVersion.v0_91;
- protected int maxFrameSize;
-
- public AmqpCodeDecoder(int maxFrameSize) {
- super();
- this.maxFrameSize = maxFrameSize;
- }
private boolean firstRead = true;
@@ -116,9 +115,12 @@ public class AmqpCodeDecoder extends AbstractBatchDecoder {
in.readUnsignedShort();
int payloadSize = in.readInt();
in.resetReaderIndex();
- if (payloadSize > maxFrameSize) {
- throw new MalformedFrameException(
- "frame > maxFrameSize exception remoteAddress:" + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+ if (ctx.channel().attr(ATTRIBUTE_KEY_MAX_FRAME_SIZE).get() != null) {
+ int maxFrameSize = ctx.channel().attr(ATTRIBUTE_KEY_MAX_FRAME_SIZE).get();
+ if (payloadSize > maxFrameSize) {
+ throw new MalformedFrameException(
+ "frame > maxFrameSize exception remoteAddress:" + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+ }
}
if (in.readableBytes() >= (AMQPFrame.NON_BODY_SIZE + payloadSize)) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org