You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/09/27 02:29:45 UTC

[incubator-eventmesh] branch protocol-amqp updated: realization amqp command processor

This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch protocol-amqp
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/protocol-amqp by this push:
     new 75fe16ce realization amqp command processor
     new e5c98ba2 Merge pull request #1427 from wangshaojie4039/protocol-amqp
75fe16ce is described below

commit 75fe16ce519237b9c7a04a91478e335e9651870e
Author: wangshaojie <wa...@cmss.chinamobile.com>
AuthorDate: Tue Sep 27 09:52:49 2022 +0800

    realization amqp command processor
---
 .../amqp/exception/AmqpFrameDecodingException.java |   8 +
 .../core/protocol/amqp/processor/AmqpHandler.java  | 378 +++++++++++++++++++++
 2 files changed, 386 insertions(+)

diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/exception/AmqpFrameDecodingException.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/exception/AmqpFrameDecodingException.java
new file mode 100644
index 00000000..c3ef4f2b
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/exception/AmqpFrameDecodingException.java
@@ -0,0 +1,8 @@
+package org.apache.eventmesh.runtime.core.protocol.amqp.exception;
+
+public class AmqpFrameDecodingException extends Exception {
+
+    public AmqpFrameDecodingException(String message) {
+        super(message);
+    }
+}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/processor/AmqpHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/processor/AmqpHandler.java
new file mode 100644
index 00000000..0ba67713
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/processor/AmqpHandler.java
@@ -0,0 +1,378 @@
+package org.apache.eventmesh.runtime.core.protocol.amqp.processor;
+
+import org.apache.eventmesh.runtime.core.protocol.amqp.exception.AmqpFrameDecodingException;
+import org.apache.eventmesh.runtime.core.protocol.amqp.remoting.AMQPFrame;
+import org.apache.eventmesh.runtime.core.protocol.amqp.remoting.protocol.ProtocolFrame;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.UnknownClassOrMethodId;
+import com.rabbitmq.client.impl.AMQImpl;
+import com.rabbitmq.client.impl.Method;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public abstract class AmqpHandler extends ChannelInboundHandlerAdapter implements ConnectionMethodProcessor {
+
+    protected ChannelHandlerContext ctx;
+    protected SocketAddress remoteAddress;
+    // TODO
+    protected final Object amqpServer;
+    @Getter
+    protected AtomicBoolean isActive = new AtomicBoolean(false);
+
+    protected AmqpHandler(Object amqpServer) {
+        this.amqpServer = amqpServer;
+    }
+
+    @Override
+    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
+        if (log.isDebugEnabled()) {
+            log.debug("Channel writability has changed to: {}", ctx.channel().isWritable());
+        }
+    }
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+        if (msg instanceof List) {
+            for (final Object m : (List<?>) msg) {
+                AmqpHandler.this.process(ctx, m);
+            }
+        } else {
+            process(ctx, msg);
+        }
+
+    }
+
+    private void process(ChannelHandlerContext ctx, Object msg) throws AmqpFrameDecodingException {
+        if (msg instanceof ProtocolFrame) {
+            receiveProtocolHeader((ProtocolFrame) msg);
+        } else if (msg instanceof AMQPFrame) {
+            processFrame((AMQPFrame) msg);
+        }
+
+    }
+
+    private void processFrame(AMQPFrame frame) throws AmqpFrameDecodingException {
+        try {
+            int channelNo = frame.getChannel();
+            switch (frame.getType()) {
+                case AMQP.FRAME_METHOD:
+                    try {
+                        Method method = AMQImpl.readMethodFrom(frame.getInputStream());
+                        processMethod(channelNo, method);
+                    } catch (UnknownClassOrMethodId e) {
+                        log.error("method frame serializer exception  classId {} ,methodId {}", e.classId, e.methodId, e);
+                    } catch (IOException e) {
+                        log.error("method frame serializer exception", e);
+                        //throw new AMQFrameDecodingException("method frame serializer exception: " + AMQP.FRAME_METHOD);
+                    }
+
+                    break;
+                case AMQP.FRAME_HEADER:
+                    getChannelMethodProcessor(channelNo).receiveMessageHeader(frame);
+                    break;
+                case AMQP.FRAME_BODY:
+                    getChannelMethodProcessor(channelNo).receiveMessageContent(frame);
+                    break;
+                case AMQP.FRAME_HEARTBEAT:
+                    receiveHeartbeat();
+                    break;
+                default:
+                    throw new AmqpFrameDecodingException("Unsupported frame type: " + frame.getType());
+            }
+        } finally {
+            // safe release??
+            frame.getPayload().release();
+        }
+    }
+
+    private void processMethod(int channelNo, Method method) throws UnknownClassOrMethodId {
+        int classId = method.protocolClassId();
+        int methodId = method.protocolMethodId();
+        if (ignoreAllButCloseOk()) {
+            if (!(classId == 10 && methodId == 51)) {
+                return;
+            }
+        }
+        if (classId != 10 && getChannelMethodProcessor(channelNo).ignoreAllButCloseOk()) {
+            if (!(classId == 20 && methodId == 41)) {
+                return;
+            }
+        }
+        switch (classId) {
+            //CONNECTION_CLASS:
+            case 10:
+                switch (methodId) {
+                    case 11: {
+                        AMQP.Connection.StartOk startOk = (AMQP.Connection.StartOk) method;
+                        receiveConnectionStartOk(startOk.getClientProperties(), startOk.getMechanism(), startOk.getResponse(), startOk.getLocale());
+                    }
+                    break;
+                    case 21: {
+                        AMQP.Connection.SecureOk secureOk = (AMQP.Connection.SecureOk) method;
+                        receiveConnectionSecureOk(secureOk.getResponse());
+                    }
+                    break;
+                    case 31: {
+                        AMQP.Connection.TuneOk tuneOk = (AMQP.Connection.TuneOk) method;
+                        receiveConnectionTuneOk(tuneOk.getChannelMax(), tuneOk.getFrameMax(), tuneOk.getHeartbeat());
+                    }
+                    break;
+                    case 40: {
+                        AMQP.Connection.Open open = (AMQP.Connection.Open) method;
+                        receiveConnectionOpen(open.getVirtualHost(), open.getCapabilities(), open.getInsist());
+                    }
+                    break;
+                    case 50: {
+                        AMQP.Connection.Close close = (AMQP.Connection.Close) method;
+                        receiveConnectionClose(close.getReplyCode(), close.getReplyText(), close.getClassId(), close.getMethodId());
+                    }
+                    break;
+                    case 51: {
+                        receiveConnectionCloseOk();
+                    }
+                    break;
+                    // TODO need impl???
+//                    case 60: {
+//                        return new AMQImpl.Connection.Blocked(new MethodArgumentReader(new ValueReader(in)));
+//                    }
+//                    case 61: {
+//                        return new AMQImpl.Connection.Unblocked(new MethodArgumentReader(new ValueReader(in)));
+//                    }
+//                    case 70: {
+//                        return new AMQImpl.Connection.UpdateSecret(new MethodArgumentReader(new ValueReader(in)));
+//                    }
+//                    case 71: {
+//                        return new AMQImpl.Connection.UpdateSecretOk(new MethodArgumentReader(new ValueReader(in)));
+//                    }
+                    default:
+                        break;
+                }
+                break;
+            case 20:
+                switch (methodId) {
+                    case 10: {
+                        receiveChannelOpen(channelNo);
+                    }
+                    break;
+                    case 20: {
+                        AMQP.Channel.Flow flow = (AMQP.Channel.Flow) method;
+                        getChannelMethodProcessor(channelNo).receiveChannelFlow(flow.getActive());
+                    }
+                    break;
+                    case 21: {
+                        AMQP.Channel.FlowOk flowOk = (AMQP.Channel.FlowOk) method;
+                        getChannelMethodProcessor(channelNo).receiveChannelFlowOk(flowOk.getActive());
+                    }
+                    break;
+                    case 40: {
+                        AMQP.Channel.Close close = (AMQP.Channel.Close) method;
+                        getChannelMethodProcessor(channelNo).receiveChannelClose(close.getReplyCode(),
+                            close.getReplyText(), close.getClassId(), close.getMethodId());
+                    }
+                    break;
+                    case 41: {
+                        getChannelMethodProcessor(channelNo).receiveChannelCloseOk();
+                    }
+                    break;
+                    default:
+                        break;
+                }
+                break;
+            case 30:
+                switch (methodId) {
+                    case 10: {
+                        AMQP.Access.Request request = (AMQP.Access.Request) method;
+                        getChannelMethodProcessor(channelNo).receiveAccessRequest(request.getRealm(),
+                            request.getExclusive(), request.getPassive(), request.getActive(), request.getWrite(),
+                            request.getRead());
+                    }
+                    break;
+
+                    default:
+                        break;
+                }
+                break;
+            case 40:
+                switch (methodId) {
+                    case 10: {
+                        AMQP.Exchange.Declare declare = (AMQP.Exchange.Declare) method;
+                        getChannelMethodProcessor(channelNo).receiveExchangeDeclare(declare.getExchange(),
+                            declare.getType(), declare.getPassive(), declare.getDurable(), declare.getAutoDelete(),
+                            declare.getInternal(), declare.getNowait(), declare.getArguments());
+                    }
+                    break;
+                    case 20: {
+                        AMQP.Exchange.Delete delete = (AMQP.Exchange.Delete) method;
+                        getChannelMethodProcessor(channelNo).receiveExchangeDelete(delete.getExchange(), delete.getIfUnused(), delete.getNowait());
+                    }
+                    break;
+
+                    case 30: {
+                        AMQP.Exchange.Bind bind = (AMQP.Exchange.Bind) method;
+                        getChannelMethodProcessor(channelNo).receiveExchangeBound(bind.getSource(), bind.getRoutingKey(), bind.getDestination());
+                    }
+                    break;
+//                    case 40: {
+//                        getChannelMethodProcessor(channelNo).receiveExchangeBound(method);
+//                        return new AMQImpl.Exchange.Unbind(new MethodArgumentReader(new ValueReader(in)));
+//                    }
+//                    case 51: {
+//                        return new AMQImpl.Exchange.UnbindOk(new MethodArgumentReader(new ValueReader(in)));
+//                    }
+                    default:
+                        break;
+                }
+                break;
+            case 50:
+                switch (methodId) {
+                    case 10: {
+                        AMQP.Queue.Declare declare = (AMQP.Queue.Declare) method;
+                        getChannelMethodProcessor(channelNo).receiveQueueDeclare(declare.getQueue(),
+                            declare.getPassive(), declare.getDurable(), declare.getExclusive(), declare.getAutoDelete(),
+                            declare.getNowait(), declare.getArguments());
+                    }
+                    break;
+                    case 20: {
+                        AMQP.Queue.Bind bind = (AMQP.Queue.Bind) method;
+                        getChannelMethodProcessor(channelNo).receiveQueueBind(bind.getQueue(), bind.getExchange(),
+                            bind.getRoutingKey(), bind.getNowait(), bind.getArguments());
+                    }
+                    break;
+                    case 30: {
+                        AMQP.Queue.Purge purge = (AMQP.Queue.Purge) method;
+                        getChannelMethodProcessor(channelNo).receiveQueuePurge(purge.getQueue(), purge.getNowait());
+                    }
+                    break;
+                    case 40: {
+                        AMQP.Queue.Delete delete = (AMQP.Queue.Delete) method;
+                        getChannelMethodProcessor(channelNo).receiveQueueDelete(delete.getQueue(), delete.getIfUnused(),
+                            delete.getIfEmpty(), delete.getNowait());
+                    }
+                    break;
+                    case 50: {
+                        AMQP.Queue.Unbind unbind = (AMQP.Queue.Unbind) method;
+                        getChannelMethodProcessor(channelNo).receiveQueueUnbind(unbind.getQueue(), unbind.getExchange(),
+                            unbind.getRoutingKey(), unbind.getArguments());
+                    }
+                    break;
+
+                    default:
+                        break;
+                }
+                break;
+            case 60:
+                switch (methodId) {
+                    case 10: {
+                        AMQP.Basic.Qos qos = (AMQP.Basic.Qos) method;
+                        getChannelMethodProcessor(channelNo).receiveBasicQos(qos.getPrefetchSize(),
+                            qos.getPrefetchCount(), qos.getGlobal());
+                    }
+                    break;
+                    case 20: {
+                        AMQP.Basic.Consume consume = (AMQP.Basic.Consume) method;
+                        getChannelMethodProcessor(channelNo).receiveBasicConsume(consume.getQueue(),
+                            consume.getConsumerTag(), consume.getNoLocal(), consume.getNoAck(), consume.getExclusive(),
+                            consume.getNowait(), consume.getArguments());
+                    }
+                    break;
+                    case 30: {
+                        AMQP.Basic.Cancel cancel = (AMQP.Basic.Cancel) method;
+                        getChannelMethodProcessor(channelNo).receiveBasicCancel(cancel.getConsumerTag(), cancel.getNowait());
+                    }
+                    break;
+                    case 40: {
+                        AMQP.Basic.Publish publish = (AMQP.Basic.Publish) method;
+                        getChannelMethodProcessor(channelNo).receiveBasicPublish(publish.getExchange(),
+                            publish.getRoutingKey(), publish.getMandatory(), publish.getImmediate());
+                    }
+                    break;
+
+                    case 70: {
+                        AMQP.Basic.Get get = (AMQP.Basic.Get) method;
+                        getChannelMethodProcessor(channelNo).receiveBasicGet(get.getQueue(), get.getNoAck());
+                    }
+                    break;
+
+                    case 80: {
+                        AMQP.Basic.Ack ack = (AMQP.Basic.Ack) method;
+                        getChannelMethodProcessor(channelNo).receiveBasicAck(ack.getDeliveryTag(), ack.getMultiple());
+                    }
+                    break;
+                    case 90: {
+                        AMQP.Basic.Reject reject = (AMQP.Basic.Reject) method;
+                        getChannelMethodProcessor(channelNo).receiveBasicReject(reject.getDeliveryTag(), reject.getRequeue());
+                    }
+                    break;
+                    case 100: {
+                        AMQP.Basic.RecoverAsync recover = (AMQP.Basic.RecoverAsync) method;
+                        getChannelMethodProcessor(channelNo).receiveBasicRecover(recover.getRequeue(), false);
+                    }
+                    break;
+                    case 110: {
+                        AMQP.Basic.Recover recover = (AMQP.Basic.Recover) method;
+                        getChannelMethodProcessor(channelNo).receiveBasicRecover(recover.getRequeue(), true);
+                    }
+                    break;
+                    case 120: {
+                        AMQP.Basic.Nack noack = (AMQP.Basic.Nack) method;
+                        getChannelMethodProcessor(channelNo).receiveBasicNack(noack.getDeliveryTag(), noack.getMultiple(), noack.getRequeue());
+                    }
+                    break;
+                    default:
+                        break;
+                }
+                break;
+            case 90:
+                switch (methodId) {
+                    case 10: {
+                        getChannelMethodProcessor(channelNo).receiveTxSelect();
+                    }
+                    break;
+
+                    case 20: {
+                        getChannelMethodProcessor(channelNo).receiveTxCommit();
+                    }
+                    break;
+
+                    case 30: {
+                        getChannelMethodProcessor(channelNo).receiveTxRollback();
+                    }
+                    break;
+
+                    default:
+                        break;
+                }
+                break;
+            case 85:
+                switch (methodId) {
+                    case 10: {
+                        AMQP.Confirm.Select select = (AMQP.Confirm.Select) method;
+                        getChannelMethodProcessor(channelNo).receiveConfirmSelect(select.getNowait());
+                    }
+                    break;
+                    default:
+                        break;
+                }
+                break;
+        }
+
+    }
+
+    public abstract void close();
+
+    public ChannelHandlerContext getCtx() {
+        return ctx;
+    }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org