You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/11/14 09:11:33 UTC

[incubator-eventmesh] branch protocol-amqp updated: implementation of amqp message sender

This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch protocol-amqp
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/protocol-amqp by this push:
     new 22541898 implementation of amqp message sender
     new b18477d2 Merge pull request #2214 from jackyluo-learning/protocol-amqp-shadow7
22541898 is described below

commit 22541898e4ba203e07903df33c168348fccc7253
Author: jackyluo <15...@qq.com>
AuthorDate: Mon Nov 14 16:14:37 2022 +0800

    implementation of amqp message sender
---
 .../common/protocol/amqp/AmqpMessage.java          |  16 +-
 .../protocol/amqp/consumer/AmqpConsumerImpl.java   |  25 ++--
 .../protocol/amqp/consumer/AmqpMessageSender.java  | 161 +++++++++++++++++++++
 .../amqp/consumer/UnacknowledgedMessageMap.java    | 118 +++++++++++++++
 .../protocol/amqp/processor/AmqpConnection.java    |  15 +-
 5 files changed, 308 insertions(+), 27 deletions(-)

diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/amqp/AmqpMessage.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/amqp/AmqpMessage.java
index 0d12fa8e..9b855b3b 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/amqp/AmqpMessage.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/amqp/AmqpMessage.java
@@ -19,13 +19,15 @@
 
 package org.apache.eventmesh.common.protocol.amqp;
 
-import java.util.Map;
+import org.apache.eventmesh.common.protocol.ProtocolTransportObject;
+import org.apache.eventmesh.common.protocol.amqp.common.ProtocolKey;
 
-import lombok.Data;
+import java.util.Map;
 
-import org.apache.eventmesh.common.protocol.ProtocolTransportObject;
 import com.rabbitmq.client.AMQP.BasicProperties;
 
+import lombok.Data;
+
 /**
  * message body of Amqp, including content header and content body
  */
@@ -50,4 +52,12 @@ public class AmqpMessage implements ProtocolTransportObject {
     public long getSize() {
         return contentBody == null ? 0 : contentHeader == null ? contentBody.length : contentBody.length + contentHeader.getBodySize();
     }
+
+    public String getExchange() {
+        return extendInfo.getOrDefault(ProtocolKey.EXCHANGE, "").toString();
+    }
+
+    public String getRoutingKey() {
+        return extendInfo.getOrDefault(ProtocolKey.ROUTING_KEY, "").toString();
+    }
 }
\ No newline at end of file
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/consumer/AmqpConsumerImpl.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/consumer/AmqpConsumerImpl.java
index fb5f3307..966d544d 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/consumer/AmqpConsumerImpl.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/consumer/AmqpConsumerImpl.java
@@ -8,18 +8,18 @@ import org.apache.eventmesh.protocol.api.ProtocolPluginFactory;
 import org.apache.eventmesh.protocol.api.exception.ProtocolHandleException;
 import org.apache.eventmesh.runtime.core.plugin.MQConsumerWrapper;
 import org.apache.eventmesh.runtime.core.protocol.amqp.processor.AmqpChannel;
-import org.apache.eventmesh.runtime.core.protocol.amqp.remoting.AMQData;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.eventmesh.runtime.core.protocol.amqp.remoting.protocol.ErrorCodes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import io.cloudevents.CloudEvent;
-import io.netty.channel.ChannelFutureListener;
 
 public class AmqpConsumerImpl implements AmqpConsumer {
 
@@ -70,20 +70,13 @@ public class AmqpConsumerImpl implements AmqpConsumer {
             addUnAckMsg(deliveryTag, pushMessageContext);
         }
 
-        // TODO: 2022/10/20 convert AmqpMessage to AMQData
-        AMQData amqData = this.amqpChannel.getConnection().getAmqpInOutputConverter().convertOutput(amqpMessage);
-
-        this.amqpChannel.getConnection().getCtx().writeAndFlush(amqData).addListener(
-                (ChannelFutureListener) future -> {
-                    if (!future.isSuccess()) {
-                        logger.error("push message fail, amqData: {}", amqData);
-                        // TODO: 2022/10/21 retry strategy
-                    } else {
-                        logger.info("push message success, amqData: {}", amqData);
-                        // TODO: 2022/10/21 push success strategy
-                    }
-                }
-        );
+        try {
+            amqpChannel.getConnection().getAmqpOutputConverter().writeDeliver(amqpMessage, this.amqpChannel.getChannelId(),
+                    false, deliveryTag, consumerTag);
+        } catch (IOException e) {
+            logger.error("sendMessages IOException", e);
+            amqpChannel.closeChannel(ErrorCodes.INTERNAL_ERROR, "system error");
+        }
     }
 
     @Override
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/consumer/AmqpMessageSender.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/consumer/AmqpMessageSender.java
new file mode 100644
index 00000000..14224980
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/consumer/AmqpMessageSender.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.core.protocol.amqp.consumer;
+
+
+import org.apache.eventmesh.common.protocol.amqp.AmqpMessage;
+import org.apache.eventmesh.runtime.core.protocol.amqp.processor.AmqpConnection;
+import org.apache.eventmesh.runtime.core.protocol.amqp.remoting.AMQData;
+import org.apache.eventmesh.runtime.core.protocol.amqp.remoting.AMQPFrame;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import io.netty.buffer.ByteBuf;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.impl.Frame;
+import com.rabbitmq.client.impl.Method;
+
+import lombok.extern.log4j.Log4j2;
+
+/**
+ * Used to process command output.
+ */
+@Log4j2
+public class AmqpMessageSender {
+
+    private final AmqpConnection connection;
+
+    //
+    public AmqpMessageSender(AmqpConnection connection) {
+        this.connection = connection;
+    }
+
+    public long writeDeliver(final AmqpMessage message, int channelId,
+                             boolean isRedelivered, long deliveryTag,
+                             String consumerTag) throws IOException {
+        AMQP.Basic.Deliver deliver = connection.getCommandFactory().createBasicDeliverBody(consumerTag, deliveryTag, isRedelivered,
+                message.getExchange(), message.getRoutingKey());
+        return writeMessage(message, (Method) deliver, channelId);
+
+    }
+
+    public long writeGetOk(final AmqpMessage message, int channelId,
+                           boolean isRedelivered, long deliveryTag, int messageCount) throws IOException {
+
+        AMQP.Basic.GetOk getOk = connection.getCommandFactory().createBasicGetOkBody(deliveryTag, isRedelivered,
+                message.getExchange(), message.getRoutingKey(), messageCount);
+        return writeMessage(message, (Method) getOk, channelId);
+    }
+
+    private long writeMessage(final AmqpMessage message, final Method method, int channelId) throws IOException {
+        CompositeMessageBlock messageFrame = new CompositeMessageBlock();
+        byte[] body = message.getContentBody();
+        int bodyLength = body.length;
+        Frame headerFrame = message.getContentHeader().toFrame(channelId, bodyLength);
+
+        int frameMax = connection.getMaxFrameSize();
+        boolean cappedFrameMax = frameMax > 0;
+        int bodyPayloadMax = cappedFrameMax ? frameMax - AMQPFrame.NON_BODY_SIZE : bodyLength;
+
+        if (cappedFrameMax && headerFrame.size() > frameMax) {
+            String msg = String.format("Content headers exceeded max frame size: %d > %d", headerFrame.size(), frameMax);
+            throw new IllegalArgumentException(msg);
+        }
+        messageFrame.setMethod(AMQPFrame.get(method.toFrame(channelId)));
+        messageFrame.setHeader(AMQPFrame.get(headerFrame));
+
+        for (int offset = 0; offset < body.length; offset += bodyPayloadMax) {
+            int remaining = body.length - offset;
+
+            int fragmentLength = (remaining < bodyPayloadMax) ? remaining
+                    : bodyPayloadMax;
+            Frame frame = Frame.fromBodyFragment(channelId, body,
+                    offset, fragmentLength);
+            messageFrame.addContent(AMQPFrame.get(frame));
+        }
+        connection.writeFrame(messageFrame);
+        return body.length;
+    }
+
+    public void writeReturn(final AmqpMessage message, int channelId, int replyCode,
+                            String replyText) throws IOException {
+
+        AMQP.Basic.Return returnBody = connection.getCommandFactory().createBasicReturnBody(replyCode, replyText, message.getExchange(), message.getRoutingKey());
+
+        writeMessage(message, (Method) returnBody, channelId);
+    }
+
+
+    public class CompositeMessageBlock implements AMQData {
+
+        AMQPFrame method;
+
+        AMQPFrame header;
+
+        List<AMQPFrame> contents;
+
+        @Override
+        public void encode(ByteBuf buf) {
+            method.encode(buf);
+            header.encode(buf);
+            contents.forEach(content -> {
+                content.encode(buf);
+            });
+        }
+
+        public CompositeMessageBlock() {
+        }
+
+        public CompositeMessageBlock(AMQPFrame method, AMQPFrame header,
+                                     List<AMQPFrame> contents) {
+            this.method = method;
+            this.header = header;
+            this.contents = contents;
+        }
+
+        public AMQPFrame getMethod() {
+            return method;
+        }
+
+        public void setMethod(AMQPFrame method) {
+            this.method = method;
+        }
+
+        public AMQPFrame getHeader() {
+            return header;
+        }
+
+        public void setHeader(AMQPFrame header) {
+            this.header = header;
+        }
+
+        public List<AMQPFrame> getContents() {
+            return contents;
+        }
+
+        public void setContents(List<AMQPFrame> contents) {
+            this.contents = contents;
+        }
+
+        public void addContent(AMQPFrame content) {
+            if (this.contents == null) {
+                this.contents = new ArrayList<>(2);
+            }
+            contents.add(content);
+        }
+
+
+    }
+}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/consumer/UnacknowledgedMessageMap.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/consumer/UnacknowledgedMessageMap.java
new file mode 100644
index 00000000..34dbbfa6
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/consumer/UnacknowledgedMessageMap.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.core.protocol.amqp.consumer;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.eventmesh.runtime.core.protocol.amqp.processor.AmqpChannel;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * unack message map.
+ */
+public class UnacknowledgedMessageMap {
+
+    /**
+     * unAck positionInfo.
+     */
+    public static final class MessageConsumerAssociation {
+        private final String messageId;
+        private final AmqpConsumer consumer;
+        private final int size;
+
+        private MessageConsumerAssociation(String messageId, AmqpConsumer consumer, int size) {
+            this.messageId = messageId;
+            this.consumer = consumer;
+            this.size = size;
+        }
+
+        public String getMessageId() {
+            return messageId;
+        }
+
+        public AmqpConsumer getConsumer() {
+            return consumer;
+        }
+
+        public int getSize() {
+            return size;
+        }
+    }
+
+    private final Map<Long, MessageConsumerAssociation> map = new ConcurrentHashMap<>();
+    private final AmqpChannel channel;
+
+    public UnacknowledgedMessageMap(AmqpChannel channel) {
+        this.channel = channel;
+    }
+
+    public Collection<MessageConsumerAssociation> acknowledge(long deliveryTag, boolean multiple) {
+        if (multiple) {
+            Map<Long, MessageConsumerAssociation> acks = new HashMap<>();
+            map.entrySet().stream().forEach(entry -> {
+                if (entry.getKey() <= deliveryTag) {
+                    acks.put(entry.getKey(), entry.getValue());
+                }
+            });
+            remove(acks.keySet());
+            return acks.values();
+        } else {
+            MessageConsumerAssociation association = remove(deliveryTag);
+            if (association != null) {
+                return Collections.singleton(association);
+            }
+        }
+        return Collections.emptySet();
+    }
+
+    public Collection<MessageConsumerAssociation> acknowledgeAll() {
+        Set<MessageConsumerAssociation> associations = new HashSet<>();
+        associations.addAll(map.values());
+        map.clear();
+        return associations;
+    }
+
+    public void add(long deliveryTag, String messageId, AmqpConsumer consumer, int size) {
+        checkNotNull(messageId);
+        checkNotNull(consumer);
+        map.put(deliveryTag, new MessageConsumerAssociation(messageId, consumer, size));
+    }
+
+    public void remove(Collection<Long> deliveryTag) {
+        deliveryTag.stream().forEach(tag -> {
+            map.remove(tag);
+        });
+    }
+
+    public MessageConsumerAssociation remove(long deliveryTag) {
+        MessageConsumerAssociation entry = map.remove(deliveryTag);
+        return entry;
+    }
+
+    public int size() {
+        return map.size();
+    }
+
+    @VisibleForTesting
+    public Map<Long, MessageConsumerAssociation> getMap() {
+        return map;
+    }
+}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/processor/AmqpConnection.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/processor/AmqpConnection.java
index a3f9e22e..7455c0f0 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/processor/AmqpConnection.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/processor/AmqpConnection.java
@@ -23,6 +23,7 @@ import io.netty.handler.timeout.IdleStateHandler;
 import lombok.Getter;
 import org.apache.eventmesh.runtime.boot.EventMeshAmqpServer;
 import org.apache.eventmesh.runtime.configuration.EventMeshAmqpConfiguration;
+import org.apache.eventmesh.runtime.core.protocol.amqp.consumer.AmqpMessageSender;
 import org.apache.eventmesh.runtime.core.protocol.amqp.remoting.AMQData;
 import org.apache.eventmesh.runtime.core.protocol.amqp.remoting.AMQPFrame;
 import org.apache.eventmesh.runtime.core.protocol.amqp.remoting.CommandFactory;
@@ -83,8 +84,7 @@ public class AmqpConnection extends AmqpHandler {
     private String virtualHostName;
     private final Object channelAddRemoveLock = new Object();
     private AtomicBoolean blocked = new AtomicBoolean();
-    private AmqpInOutputConverter amqpInOutputConverter;
-    //private AmqpMessageSender amqpOutputConverter;
+    private AmqpMessageSender amqpOutputConverter;
 
 
     public AmqpConnection(EventMeshAmqpServer amqpBrokerService) {
@@ -98,8 +98,11 @@ public class AmqpConnection extends AmqpHandler {
         this.maxChannels = amqpConfig.maxNoOfChannels;
         this.maxFrameSize = amqpConfig.maxFrameSize;
         this.heartBeat = amqpConfig.heartBeat;
-        this.amqpInOutputConverter = new AmqpInOutputConverter(this);
-        //this.amqpOutputConverter = new AmqpMessageSender(this);
+        this.amqpOutputConverter = new AmqpMessageSender(this);
+    }
+
+    public AmqpMessageSender getAmqpOutputConverter() {
+        return amqpOutputConverter;
     }
 
     @Override
@@ -625,8 +628,4 @@ public class AmqpConnection extends AmqpHandler {
         return ctx.channel().isActive();
     }
 
-    public AmqpInOutputConverter getAmqpInOutputConverter() {
-        return amqpInOutputConverter;
-    }
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org