You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/11/14 07:35:20 UTC

[incubator-eventmesh] branch protocol-amqp updated: implementation of command

This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch protocol-amqp
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/protocol-amqp by this push:
     new 6f3f43ca implementation of command
     new e6a0ad45 Merge pull request #2210 from jackyluo-learning/protocol-amqp-shadow5
6f3f43ca is described below

commit 6f3f43ca11a9f6edf4b85bdde0573db6529f78ae
Author: jackyluo <15...@qq.com>
AuthorDate: Mon Nov 14 15:20:58 2022 +0800

    implementation of command
---
 .../amqp/exception/UnexpectedFrameError.java       |  32 +++
 .../core/protocol/amqp/remoting/AmqpCommand.java   | 227 +++++++++++++++++++++
 .../core/protocol/amqp/remoting/Command.java       |  39 ++++
 3 files changed, 298 insertions(+)

diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/exception/UnexpectedFrameError.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/exception/UnexpectedFrameError.java
new file mode 100644
index 00000000..4d1fe184
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/exception/UnexpectedFrameError.java
@@ -0,0 +1,32 @@
+
+
+package org.apache.eventmesh.runtime.core.protocol.amqp.exception;
+
+import org.apache.eventmesh.runtime.core.protocol.amqp.remoting.AMQPFrame;
+
+/**
+ * Thrown when the command parser hits an unexpected frame type.
+ */
+public class UnexpectedFrameError extends RuntimeException {
+    private static final long serialVersionUID = 1L;
+    private final AMQPFrame _frame;
+    private final int _expectedFrameType;
+
+    public UnexpectedFrameError(AMQPFrame frame, int expectedFrameType) {
+        super("Received frame: " + frame + ", expected type " + expectedFrameType);
+        _frame = frame;
+        _expectedFrameType = expectedFrameType;
+    }
+
+    public static long getSerialVersionUID() {
+        return serialVersionUID;
+    }
+
+    public AMQPFrame getReceivedFrame() {
+        return _frame;
+    }
+
+    public int getExpectedFrameType() {
+        return _expectedFrameType;
+    }
+}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/remoting/AmqpCommand.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/remoting/AmqpCommand.java
new file mode 100644
index 00000000..3de8636d
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/remoting/AmqpCommand.java
@@ -0,0 +1,227 @@
+
+package org.apache.eventmesh.runtime.core.protocol.amqp.remoting;
+
+import org.apache.eventmesh.runtime.core.protocol.amqp.exception.UnexpectedFrameError;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.impl.AMQContentHeader;
+import com.rabbitmq.client.impl.AMQImpl;
+import com.rabbitmq.client.impl.Method;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.util.Recycler;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class AmqpCommand implements Command {
+
+    private static final ByteBuf EMPTY_BYTE_ARRAY = Unpooled.EMPTY_BUFFER;
+
+    /** Current state, used to decide how to handle each incoming frame. */
+    private enum CAState {
+        EXPECTING_METHOD, EXPECTING_CONTENT_HEADER, EXPECTING_CONTENT_BODY, COMPLETE
+    }
+
+    private CAState state;
+
+    /** The method for this command */
+    private Method method;
+
+    /** The content header for this command */
+    private AMQContentHeader contentHeader;
+
+    /** The fragments of this command's content body - a list of byte[] */
+    private List<ByteBuf> bodyN;
+    /** sum of the lengths of all fragments */
+    private int bodyLength;
+
+    /** No bytes of content body not yet accumulated */
+    private long remainingBodyBytes;
+
+    public static AmqpCommand get(Method method) {
+        return AmqpCommand.get(method, null, null);
+    }
+
+    public static AmqpCommand get(Method method, AMQContentHeader contentHeader, ByteBuf body) {
+        AmqpCommand command = RECYCLER.get();
+        CAState state;
+        int bodyLength = 0;
+        long remainingBodyBytes = 0;
+
+        if (command.bodyN == null) {
+            command.bodyN = new ArrayList<>(2);
+        } else {
+            command.bodyN.clear();
+        }
+
+        if (body != null && body.readableBytes() > 0) {
+            command.bodyN.add(body);
+            bodyLength += body.readableBytes();
+        }
+
+        if (method == null) {
+            state = CAState.EXPECTING_METHOD;
+        } else if (contentHeader == null) {
+            state = method.hasContent() ? CAState.EXPECTING_CONTENT_HEADER : CAState.COMPLETE;
+        } else {
+            remainingBodyBytes = contentHeader.getBodySize() - bodyLength;
+            state = (remainingBodyBytes > 0) ? CAState.EXPECTING_CONTENT_BODY : CAState.COMPLETE;
+        }
+
+        command.method = method;
+        command.contentHeader = contentHeader;
+        command.bodyLength = bodyLength;
+        command.remainingBodyBytes = remainingBodyBytes;
+        command.state = state;
+        return command;
+    }
+
+    @Override
+    public Method getMethod() {
+        return this.method;
+    }
+
+    @Override
+    public AMQContentHeader getContentHeader() {
+        return this.contentHeader;
+    }
+
+    /** @return true if the command is complete */
+    public synchronized boolean isComplete() {
+        return (this.state == CAState.COMPLETE);
+    }
+
+    /** Decides whether more body frames are expected */
+    private void updateContentBodyState() {
+        this.state = (this.remainingBodyBytes > 0) ? CAState.EXPECTING_CONTENT_BODY : CAState.COMPLETE;
+    }
+
+    private void consumeMethodFrame(AMQPFrame f) throws IOException {
+        if (f.getType() == AMQP.FRAME_METHOD) {
+            this.method = AMQImpl.readMethodFrom(f.getInputStream());
+            this.state = this.method.hasContent() ? CAState.EXPECTING_CONTENT_HEADER : CAState.COMPLETE;
+        } else {
+            throw new UnexpectedFrameError(f, AMQP.FRAME_METHOD);
+        }
+    }
+
+    private void consumeHeaderFrame(AMQPFrame f) throws IOException {
+        if (f.getType() == AMQP.FRAME_HEADER) {
+            this.contentHeader = AMQImpl.readContentHeaderFrom(f.getInputStream());
+            this.remainingBodyBytes = this.contentHeader.getBodySize();
+            updateContentBodyState();
+        } else {
+            throw new UnexpectedFrameError(f, AMQP.FRAME_HEADER);
+        }
+    }
+
+    private void consumeBodyFrame(AMQPFrame f) {
+        if (f.getType() == AMQP.FRAME_BODY) {
+            ByteBuf fragment = Unpooled.copiedBuffer(f.getPayload());
+            this.remainingBodyBytes -= fragment.readableBytes();
+            updateContentBodyState();
+            if (this.remainingBodyBytes < 0) {
+                throw new UnsupportedOperationException("%%%%%% FIXME unimplemented");
+            }
+            appendBodyFragment(fragment);
+        } else {
+            throw new UnexpectedFrameError(f, AMQP.FRAME_BODY);
+        }
+    }
+
+    /** Stitches together a fragmented content body into a single byte array */
+    private ByteBuf coalesceContentBody() {
+        if (this.bodyLength == 0) {
+            return EMPTY_BYTE_ARRAY;
+        }
+
+        if (this.bodyN.size() == 1) {
+            return this.bodyN.get(0);
+        }
+
+        ByteBuf body = Unpooled.buffer(bodyLength);
+        for (ByteBuf fragment : this.bodyN) {
+            body.writeBytes(fragment);
+        }
+        this.bodyN.clear();
+        this.bodyN.add(body);
+        return body;
+    }
+
+    @Override
+    public synchronized ByteBuf getContentBody() {
+        return coalesceContentBody();
+    }
+
+    @VisibleForTesting
+    public List<ByteBuf> getBodyN() {
+        return bodyN;
+    }
+
+    private void appendBodyFragment(ByteBuf fragment) {
+        if (fragment == null || fragment.readableBytes() == 0) {
+            return;
+        }
+
+        bodyN.add(fragment);
+        bodyLength += fragment.readableBytes();
+    }
+
+    /**
+     * @param f frame to be incorporated
+     * @return true if command becomes complete
+     * @throws IOException if error reading frame
+     */
+    public synchronized boolean handleFrame(AMQPFrame f) throws IOException {
+        switch (this.state) {
+            case EXPECTING_METHOD:
+                consumeMethodFrame(f);
+                break;
+            case EXPECTING_CONTENT_HEADER:
+                consumeHeaderFrame(f);
+                break;
+            case EXPECTING_CONTENT_BODY:
+                consumeBodyFrame(f);
+                break;
+
+            default:
+                throw new IllegalStateException("Bad Command State " + this.state);
+        }
+        return isComplete();
+    }
+
+    private final Recycler.Handle<AmqpCommand> recyclerHandle;
+
+    private AmqpCommand(Recycler.Handle<AmqpCommand> recyclerHandle) {
+        this.recyclerHandle = recyclerHandle;
+    }
+
+    private static final Recycler<AmqpCommand> RECYCLER = new Recycler<AmqpCommand>() {
+
+        @Override
+        protected AmqpCommand newObject(Handle<AmqpCommand> handle) {
+            return new AmqpCommand(handle);
+        }
+    };
+
+    public void recycle() {
+        this.state = null;
+        this.method = null;
+        this.contentHeader = null;
+
+        if (this.bodyN != null) {
+            this.bodyN.clear();
+        }
+
+        this.bodyLength = -1;
+        this.remainingBodyBytes = -1;
+
+        if (recyclerHandle != null) {
+            recyclerHandle.recycle(this);
+        }
+
+    }
+
+}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/remoting/Command.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/remoting/Command.java
new file mode 100644
index 00000000..b3fef496
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/remoting/Command.java
@@ -0,0 +1,39 @@
+
+
+package org.apache.eventmesh.runtime.core.protocol.amqp.remoting;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.ContentHeader;
+import com.rabbitmq.client.Method;
+import io.netty.buffer.ByteBuf;
+
+/**
+ * Interface to a container for an AMQP method-and-arguments, with optional content header and body.
+ */
+public interface Command {
+    /**
+     * Retrieves the {@link Method} held within this Command. Downcast to
+     * concrete (implementation-specific!) subclasses as necessary.
+     *
+     * @return the command's method.
+     */
+    Method getMethod();
+
+    /**
+     * Retrieves the ContentHeader subclass instance held as part of this Command, if any.
+     *
+     * Downcast to one of the inner classes of AMQP,
+     * for instance {@link AMQP.BasicProperties}, as appropriate.
+     *
+     * @return the Command's {@link ContentHeader}, or null if none
+     */
+    ContentHeader getContentHeader();
+
+    /**
+     * Retrieves the body byte array that travelled as part of this
+     * Command, if any.
+     *
+     * @return the Command's content body, or null if none
+     */
+    ByteBuf getContentBody();
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org