You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/11/14 09:11:33 UTC
[incubator-eventmesh] branch protocol-amqp updated: implementation of amqp message sender
This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch protocol-amqp
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/protocol-amqp by this push:
new 22541898 implementation of amqp message sender
new b18477d2 Merge pull request #2214 from jackyluo-learning/protocol-amqp-shadow7
22541898 is described below
commit 22541898e4ba203e07903df33c168348fccc7253
Author: jackyluo <15...@qq.com>
AuthorDate: Mon Nov 14 16:14:37 2022 +0800
implementation of amqp message sender
---
.../common/protocol/amqp/AmqpMessage.java | 16 +-
.../protocol/amqp/consumer/AmqpConsumerImpl.java | 25 ++--
.../protocol/amqp/consumer/AmqpMessageSender.java | 161 +++++++++++++++++++++
.../amqp/consumer/UnacknowledgedMessageMap.java | 118 +++++++++++++++
.../protocol/amqp/processor/AmqpConnection.java | 15 +-
5 files changed, 308 insertions(+), 27 deletions(-)
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/amqp/AmqpMessage.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/amqp/AmqpMessage.java
index 0d12fa8e..9b855b3b 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/amqp/AmqpMessage.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/amqp/AmqpMessage.java
@@ -19,13 +19,15 @@
package org.apache.eventmesh.common.protocol.amqp;
-import java.util.Map;
+import org.apache.eventmesh.common.protocol.ProtocolTransportObject;
+import org.apache.eventmesh.common.protocol.amqp.common.ProtocolKey;
-import lombok.Data;
+import java.util.Map;
-import org.apache.eventmesh.common.protocol.ProtocolTransportObject;
import com.rabbitmq.client.AMQP.BasicProperties;
+import lombok.Data;
+
/**
* message body of Amqp, including content header and content body
*/
@@ -50,4 +52,12 @@ public class AmqpMessage implements ProtocolTransportObject {
public long getSize() {
return contentBody == null ? 0 : contentHeader == null ? contentBody.length : contentBody.length + contentHeader.getBodySize();
}
+
+ public String getExchange() {
+ return extendInfo.getOrDefault(ProtocolKey.EXCHANGE, "").toString();
+ }
+
+ public String getRoutingKey() {
+ return extendInfo.getOrDefault(ProtocolKey.ROUTING_KEY, "").toString();
+ }
}
\ No newline at end of file
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/consumer/AmqpConsumerImpl.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/consumer/AmqpConsumerImpl.java
index fb5f3307..966d544d 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/consumer/AmqpConsumerImpl.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/consumer/AmqpConsumerImpl.java
@@ -8,18 +8,18 @@ import org.apache.eventmesh.protocol.api.ProtocolPluginFactory;
import org.apache.eventmesh.protocol.api.exception.ProtocolHandleException;
import org.apache.eventmesh.runtime.core.plugin.MQConsumerWrapper;
import org.apache.eventmesh.runtime.core.protocol.amqp.processor.AmqpChannel;
-import org.apache.eventmesh.runtime.core.protocol.amqp.remoting.AMQData;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.eventmesh.runtime.core.protocol.amqp.remoting.protocol.ErrorCodes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.cloudevents.CloudEvent;
-import io.netty.channel.ChannelFutureListener;
public class AmqpConsumerImpl implements AmqpConsumer {
@@ -70,20 +70,13 @@ public class AmqpConsumerImpl implements AmqpConsumer {
addUnAckMsg(deliveryTag, pushMessageContext);
}
- // TODO: 2022/10/20 convert AmqpMessage to AMQData
- AMQData amqData = this.amqpChannel.getConnection().getAmqpInOutputConverter().convertOutput(amqpMessage);
-
- this.amqpChannel.getConnection().getCtx().writeAndFlush(amqData).addListener(
- (ChannelFutureListener) future -> {
- if (!future.isSuccess()) {
- logger.error("push message fail, amqData: {}", amqData);
- // TODO: 2022/10/21 retry strategy
- } else {
- logger.info("push message success, amqData: {}", amqData);
- // TODO: 2022/10/21 push success strategy
- }
- }
- );
+ try {
+ amqpChannel.getConnection().getAmqpOutputConverter().writeDeliver(amqpMessage, this.amqpChannel.getChannelId(),
+ false, deliveryTag, consumerTag);
+ } catch (IOException e) {
+ logger.error("sendMessages IOException", e);
+ amqpChannel.closeChannel(ErrorCodes.INTERNAL_ERROR, "system error");
+ }
}
@Override
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/consumer/AmqpMessageSender.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/consumer/AmqpMessageSender.java
new file mode 100644
index 00000000..14224980
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/consumer/AmqpMessageSender.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.core.protocol.amqp.consumer;
+
+
+import org.apache.eventmesh.common.protocol.amqp.AmqpMessage;
+import org.apache.eventmesh.runtime.core.protocol.amqp.processor.AmqpConnection;
+import org.apache.eventmesh.runtime.core.protocol.amqp.remoting.AMQData;
+import org.apache.eventmesh.runtime.core.protocol.amqp.remoting.AMQPFrame;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import io.netty.buffer.ByteBuf;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.impl.Frame;
+import com.rabbitmq.client.impl.Method;
+
+import lombok.extern.log4j.Log4j2;
+
+/**
+ * Used to process command output.
+ */
+@Log4j2
+public class AmqpMessageSender {
+
+ private final AmqpConnection connection;
+
+ //
+ public AmqpMessageSender(AmqpConnection connection) {
+ this.connection = connection;
+ }
+
+ public long writeDeliver(final AmqpMessage message, int channelId,
+ boolean isRedelivered, long deliveryTag,
+ String consumerTag) throws IOException {
+ AMQP.Basic.Deliver deliver = connection.getCommandFactory().createBasicDeliverBody(consumerTag, deliveryTag, isRedelivered,
+ message.getExchange(), message.getRoutingKey());
+ return writeMessage(message, (Method) deliver, channelId);
+
+ }
+
+ public long writeGetOk(final AmqpMessage message, int channelId,
+ boolean isRedelivered, long deliveryTag, int messageCount) throws IOException {
+
+ AMQP.Basic.GetOk getOk = connection.getCommandFactory().createBasicGetOkBody(deliveryTag, isRedelivered,
+ message.getExchange(), message.getRoutingKey(), messageCount);
+ return writeMessage(message, (Method) getOk, channelId);
+ }
+
+ private long writeMessage(final AmqpMessage message, final Method method, int channelId) throws IOException {
+ CompositeMessageBlock messageFrame = new CompositeMessageBlock();
+ byte[] body = message.getContentBody();
+ int bodyLength = body.length;
+ Frame headerFrame = message.getContentHeader().toFrame(channelId, bodyLength);
+
+ int frameMax = connection.getMaxFrameSize();
+ boolean cappedFrameMax = frameMax > 0;
+ int bodyPayloadMax = cappedFrameMax ? frameMax - AMQPFrame.NON_BODY_SIZE : bodyLength;
+
+ if (cappedFrameMax && headerFrame.size() > frameMax) {
+ String msg = String.format("Content headers exceeded max frame size: %d > %d", headerFrame.size(), frameMax);
+ throw new IllegalArgumentException(msg);
+ }
+ messageFrame.setMethod(AMQPFrame.get(method.toFrame(channelId)));
+ messageFrame.setHeader(AMQPFrame.get(headerFrame));
+
+ for (int offset = 0; offset < body.length; offset += bodyPayloadMax) {
+ int remaining = body.length - offset;
+
+ int fragmentLength = (remaining < bodyPayloadMax) ? remaining
+ : bodyPayloadMax;
+ Frame frame = Frame.fromBodyFragment(channelId, body,
+ offset, fragmentLength);
+ messageFrame.addContent(AMQPFrame.get(frame));
+ }
+ connection.writeFrame(messageFrame);
+ return body.length;
+ }
+
+ public void writeReturn(final AmqpMessage message, int channelId, int replyCode,
+ String replyText) throws IOException {
+
+ AMQP.Basic.Return returnBody = connection.getCommandFactory().createBasicReturnBody(replyCode, replyText, message.getExchange(), message.getRoutingKey());
+
+ writeMessage(message, (Method) returnBody, channelId);
+ }
+
+
+ public class CompositeMessageBlock implements AMQData {
+
+ AMQPFrame method;
+
+ AMQPFrame header;
+
+ List<AMQPFrame> contents;
+
+ @Override
+ public void encode(ByteBuf buf) {
+ method.encode(buf);
+ header.encode(buf);
+ contents.forEach(content -> {
+ content.encode(buf);
+ });
+ }
+
+ public CompositeMessageBlock() {
+ }
+
+ public CompositeMessageBlock(AMQPFrame method, AMQPFrame header,
+ List<AMQPFrame> contents) {
+ this.method = method;
+ this.header = header;
+ this.contents = contents;
+ }
+
+ public AMQPFrame getMethod() {
+ return method;
+ }
+
+ public void setMethod(AMQPFrame method) {
+ this.method = method;
+ }
+
+ public AMQPFrame getHeader() {
+ return header;
+ }
+
+ public void setHeader(AMQPFrame header) {
+ this.header = header;
+ }
+
+ public List<AMQPFrame> getContents() {
+ return contents;
+ }
+
+ public void setContents(List<AMQPFrame> contents) {
+ this.contents = contents;
+ }
+
+ public void addContent(AMQPFrame content) {
+ if (this.contents == null) {
+ this.contents = new ArrayList<>(2);
+ }
+ contents.add(content);
+ }
+
+
+ }
+}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/consumer/UnacknowledgedMessageMap.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/consumer/UnacknowledgedMessageMap.java
new file mode 100644
index 00000000..34dbbfa6
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/consumer/UnacknowledgedMessageMap.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.core.protocol.amqp.consumer;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.eventmesh.runtime.core.protocol.amqp.processor.AmqpChannel;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * unack message map.
+ */
+public class UnacknowledgedMessageMap {
+
+ /**
+ * unAck positionInfo.
+ */
+ public static final class MessageConsumerAssociation {
+ private final String messageId;
+ private final AmqpConsumer consumer;
+ private final int size;
+
+ private MessageConsumerAssociation(String messageId, AmqpConsumer consumer, int size) {
+ this.messageId = messageId;
+ this.consumer = consumer;
+ this.size = size;
+ }
+
+ public String getMessageId() {
+ return messageId;
+ }
+
+ public AmqpConsumer getConsumer() {
+ return consumer;
+ }
+
+ public int getSize() {
+ return size;
+ }
+ }
+
+ private final Map<Long, MessageConsumerAssociation> map = new ConcurrentHashMap<>();
+ private final AmqpChannel channel;
+
+ public UnacknowledgedMessageMap(AmqpChannel channel) {
+ this.channel = channel;
+ }
+
+ public Collection<MessageConsumerAssociation> acknowledge(long deliveryTag, boolean multiple) {
+ if (multiple) {
+ Map<Long, MessageConsumerAssociation> acks = new HashMap<>();
+ map.entrySet().stream().forEach(entry -> {
+ if (entry.getKey() <= deliveryTag) {
+ acks.put(entry.getKey(), entry.getValue());
+ }
+ });
+ remove(acks.keySet());
+ return acks.values();
+ } else {
+ MessageConsumerAssociation association = remove(deliveryTag);
+ if (association != null) {
+ return Collections.singleton(association);
+ }
+ }
+ return Collections.emptySet();
+ }
+
+ public Collection<MessageConsumerAssociation> acknowledgeAll() {
+ Set<MessageConsumerAssociation> associations = new HashSet<>();
+ associations.addAll(map.values());
+ map.clear();
+ return associations;
+ }
+
+ public void add(long deliveryTag, String messageId, AmqpConsumer consumer, int size) {
+ checkNotNull(messageId);
+ checkNotNull(consumer);
+ map.put(deliveryTag, new MessageConsumerAssociation(messageId, consumer, size));
+ }
+
+ public void remove(Collection<Long> deliveryTag) {
+ deliveryTag.stream().forEach(tag -> {
+ map.remove(tag);
+ });
+ }
+
+ public MessageConsumerAssociation remove(long deliveryTag) {
+ MessageConsumerAssociation entry = map.remove(deliveryTag);
+ return entry;
+ }
+
+ public int size() {
+ return map.size();
+ }
+
+ @VisibleForTesting
+ public Map<Long, MessageConsumerAssociation> getMap() {
+ return map;
+ }
+}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/processor/AmqpConnection.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/processor/AmqpConnection.java
index a3f9e22e..7455c0f0 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/processor/AmqpConnection.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/processor/AmqpConnection.java
@@ -23,6 +23,7 @@ import io.netty.handler.timeout.IdleStateHandler;
import lombok.Getter;
import org.apache.eventmesh.runtime.boot.EventMeshAmqpServer;
import org.apache.eventmesh.runtime.configuration.EventMeshAmqpConfiguration;
+import org.apache.eventmesh.runtime.core.protocol.amqp.consumer.AmqpMessageSender;
import org.apache.eventmesh.runtime.core.protocol.amqp.remoting.AMQData;
import org.apache.eventmesh.runtime.core.protocol.amqp.remoting.AMQPFrame;
import org.apache.eventmesh.runtime.core.protocol.amqp.remoting.CommandFactory;
@@ -83,8 +84,7 @@ public class AmqpConnection extends AmqpHandler {
private String virtualHostName;
private final Object channelAddRemoveLock = new Object();
private AtomicBoolean blocked = new AtomicBoolean();
- private AmqpInOutputConverter amqpInOutputConverter;
- //private AmqpMessageSender amqpOutputConverter;
+ private AmqpMessageSender amqpOutputConverter;
public AmqpConnection(EventMeshAmqpServer amqpBrokerService) {
@@ -98,8 +98,11 @@ public class AmqpConnection extends AmqpHandler {
this.maxChannels = amqpConfig.maxNoOfChannels;
this.maxFrameSize = amqpConfig.maxFrameSize;
this.heartBeat = amqpConfig.heartBeat;
- this.amqpInOutputConverter = new AmqpInOutputConverter(this);
- //this.amqpOutputConverter = new AmqpMessageSender(this);
+ this.amqpOutputConverter = new AmqpMessageSender(this);
+ }
+
+ public AmqpMessageSender getAmqpOutputConverter() {
+ return amqpOutputConverter;
}
@Override
@@ -625,8 +628,4 @@ public class AmqpConnection extends AmqpHandler {
return ctx.channel().isActive();
}
- public AmqpInOutputConverter getAmqpInOutputConverter() {
- return amqpInOutputConverter;
- }
-
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org