You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/10/24 09:01:25 UTC

[incubator-eventmesh] branch protocol-amqp updated: 1. add amqpConsumer class that can handle message pushing 2. add PushMessageContext that wrap some context attributes which can be used during pushing message to client

This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch protocol-amqp
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/protocol-amqp by this push:
     new 092da06f 1. add amqpConsumer class that can handle message pushing 2. add PushMessageContext that wrap some context attributes which can be used during pushing message to client
     new 6c2935e6 Merge pull request #1793 from jackyluo-learning/protocol-amqp-shadow3
092da06f is described below

commit 092da06f35bf8a8a801d5b147244fb4b76bb139d
Author: jackyluo <15...@qq.com>
AuthorDate: Mon Oct 24 10:38:58 2022 +0800

    1. add amqpConsumer class that can handle message pushing
    2. add PushMessageContext that wrap some context attributes which can be used during pushing message to client
---
 .../core/protocol/amqp/consumer/AmqpConsumer.java  |  32 +++++
 .../protocol/amqp/consumer/AmqpConsumerImpl.java   | 142 +++++++++++++++++++++
 .../protocol/amqp/consumer/PushMessageContext.java |  44 +++++++
 .../core/protocol/amqp/processor/AmqpChannel.java  |  25 +++-
 4 files changed, 236 insertions(+), 7 deletions(-)

diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/consumer/AmqpConsumer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/consumer/AmqpConsumer.java
new file mode 100644
index 00000000..a8c5757a
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/consumer/AmqpConsumer.java
@@ -0,0 +1,32 @@
+package org.apache.eventmesh.runtime.core.protocol.amqp.consumer;
+
+import java.util.List;
+
+/**
+ * AMQP Consumer, consume message from MQ and push to client
+ */
+public interface AmqpConsumer {
+
+    void pushMessage(PushMessageContext pushMessageContext);
+
+    void messageRedeliver(Object messageId);
+
+    void messageRedeliver(List<Object> messageIds);
+
+    /**
+     * messageId Not defined yet
+     * @param messageId
+     * @throws Exception
+     */
+    void messageAck(String messageId) throws Exception;
+
+    void close();
+
+    String getConsumerId();
+
+    String getConsumerTag();
+
+    String getQueue();
+
+    void notifyConsumer();
+}
\ No newline at end of file
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/consumer/AmqpConsumerImpl.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/consumer/AmqpConsumerImpl.java
new file mode 100644
index 00000000..fb5f3307
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/consumer/AmqpConsumerImpl.java
@@ -0,0 +1,142 @@
+package org.apache.eventmesh.runtime.core.protocol.amqp.consumer;
+
+import org.apache.eventmesh.api.AbstractContext;
+import org.apache.eventmesh.common.Constants;
+import org.apache.eventmesh.common.protocol.amqp.AmqpMessage;
+import org.apache.eventmesh.protocol.api.ProtocolAdaptor;
+import org.apache.eventmesh.protocol.api.ProtocolPluginFactory;
+import org.apache.eventmesh.protocol.api.exception.ProtocolHandleException;
+import org.apache.eventmesh.runtime.core.plugin.MQConsumerWrapper;
+import org.apache.eventmesh.runtime.core.protocol.amqp.processor.AmqpChannel;
+import org.apache.eventmesh.runtime.core.protocol.amqp.remoting.AMQData;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.cloudevents.CloudEvent;
+import io.netty.channel.ChannelFutureListener;
+
+public class AmqpConsumerImpl implements AmqpConsumer {
+
+    private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    /**
+     * each consumer should be a tag that used to identify itself only within a channel.
+     * each consumerTag should be unique within a channel.
+     */
+    private String consumerTag;
+
+    /**
+     * id
+     */
+    private String consumerId;
+
+    private boolean autoAck;
+
+    /**
+     * a queue which current consumer wants to consume from
+     */
+    private String queueName;
+
+    /**
+     * amqpChannel that current consumer used
+     */
+    private AmqpChannel amqpChannel;
+
+    /**
+     * a map that store all un ack message which has been pushed to client
+     */
+    private ConcurrentHashMap<String, PushMessageContext> unAckMap = new ConcurrentHashMap<>();
+
+    @Override
+    public void pushMessage(PushMessageContext pushMessageContext) {
+        // TODO: 2022/10/12 push to client
+        AmqpMessage amqpMessage = new AmqpMessage();
+        String protocolType = Objects.requireNonNull(pushMessageContext.getCloudEvent().getExtension(Constants.PROTOCOL_TYPE)).toString();
+        ProtocolAdaptor protocolAdaptor = ProtocolPluginFactory.getProtocolAdaptor(protocolType);
+        try {
+            amqpMessage = (AmqpMessage) protocolAdaptor.fromCloudEvent(pushMessageContext.getCloudEvent());
+        } catch (ProtocolHandleException e) {
+            // TODO: 2022/10/20 exception handle
+            throw new RuntimeException(e);
+        }
+        long deliveryTag = this.amqpChannel.getNextDeliveryTag();
+        if (!autoAck) {
+            addUnAckMsg(deliveryTag, pushMessageContext);
+        }
+
+        // TODO: 2022/10/20 convert AmqpMessage to AMQData
+        AMQData amqData = this.amqpChannel.getConnection().getAmqpInOutputConverter().convertOutput(amqpMessage);
+
+        this.amqpChannel.getConnection().getCtx().writeAndFlush(amqData).addListener(
+                (ChannelFutureListener) future -> {
+                    if (!future.isSuccess()) {
+                        logger.error("push message fail, amqData: {}", amqData);
+                        // TODO: 2022/10/21 retry strategy
+                    } else {
+                        logger.info("push message success, amqData: {}", amqData);
+                        // TODO: 2022/10/21 push success strategy
+                    }
+                }
+        );
+    }
+
+    @Override
+    public void messageRedeliver(Object messageId) {
+
+    }
+
+    @Override
+    public void messageRedeliver(List<Object> messageIds) {
+
+    }
+
+    @Override
+    public void messageAck(String messageId) throws Exception {
+        this.unAckMap.remove(messageId);
+        PushMessageContext pushMessageContext = this.unAckMap.get(messageId);
+        MQConsumerWrapper consumer = pushMessageContext.getMqConsumerWrapper();
+        AbstractContext context = pushMessageContext.getConsumeConcurrentlyContext();
+        List<CloudEvent> cloudEventList = new ArrayList<>();
+        cloudEventList.add(pushMessageContext.getCloudEvent());
+        consumer.updateOffset(cloudEventList, context);
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    @Override
+    public String getConsumerId() {
+        return consumerId;
+    }
+
+    @Override
+    public String getConsumerTag() {
+        return null;
+    }
+
+    @Override
+    public String getQueue() {
+        return null;
+    }
+
+    @Override
+    public void notifyConsumer() {
+
+    }
+
+    /**
+     * 添加unAck msg
+     */
+    private void addUnAckMsg(Long deliveryTag, PushMessageContext pushMessageContext) {
+        this.amqpChannel.getUnackMessageMap().put(deliveryTag, pushMessageContext);
+        this.unAckMap.put(pushMessageContext.getMessageId(), pushMessageContext);
+    }
+}
\ No newline at end of file
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/consumer/PushMessageContext.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/consumer/PushMessageContext.java
new file mode 100644
index 00000000..7269b24d
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/consumer/PushMessageContext.java
@@ -0,0 +1,44 @@
+package org.apache.eventmesh.runtime.core.protocol.amqp.consumer;
+
+import org.apache.eventmesh.api.AbstractContext;
+import org.apache.eventmesh.runtime.core.plugin.MQConsumerWrapper;
+import org.apache.eventmesh.runtime.util.ServerGlobal;
+
+import io.cloudevents.CloudEvent;
+
+import lombok.Data;
+
+
+/**
+ * context of message when pushing
+ */
+@Data
+public class PushMessageContext {
+
+    /**
+     * cloudEvent receive from mesh mq
+     */
+    private CloudEvent cloudEvent;
+
+    /**
+     * a global messageId
+     */
+    private String messageId;
+
+    /**
+     * times that retry to be pushed
+     */
+    private int retryTimes;
+
+    private MQConsumerWrapper mqConsumerWrapper;
+
+    private AbstractContext consumeConcurrentlyContext;
+
+    public PushMessageContext(CloudEvent cloudEvent, MQConsumerWrapper mqConsumerWrapper, AbstractContext consumeConcurrentlyContext) {
+        this.cloudEvent = cloudEvent;
+        this.messageId = ServerGlobal.getInstance().getMsgCounter().toString();
+        this.retryTimes = 0;
+        this.mqConsumerWrapper = mqConsumerWrapper;
+        this.consumeConcurrentlyContext = consumeConcurrentlyContext;
+    }
+}
\ No newline at end of file
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/processor/AmqpChannel.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/processor/AmqpChannel.java
index 72e4df73..752c3688 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/processor/AmqpChannel.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/processor/AmqpChannel.java
@@ -11,11 +11,12 @@
 
 package org.apache.eventmesh.runtime.core.protocol.amqp.processor;
 
-import com.rabbitmq.client.AMQP;
-import com.rabbitmq.client.impl.AMQCommand;
-import org.apache.commons.lang3.StringUtils;
+import static org.apache.eventmesh.runtime.core.protocol.amqp.remoting.protocol.ErrorCodes.INTERNAL_ERROR;
+import static org.apache.eventmesh.runtime.core.protocol.amqp.remoting.protocol.ErrorCodes.NOT_FOUND;
+
 import org.apache.eventmesh.runtime.boot.EventMeshAmqpServer;
 import org.apache.eventmesh.runtime.configuration.EventMeshAmqpConfiguration;
+import org.apache.eventmesh.runtime.core.protocol.amqp.consumer.PushMessageContext;
 import org.apache.eventmesh.runtime.core.protocol.amqp.exception.AmqpException;
 import org.apache.eventmesh.runtime.core.protocol.amqp.exchange.ExchangeDefaults;
 import org.apache.eventmesh.runtime.core.protocol.amqp.metadata.model.ExchangeInfo;
@@ -25,21 +26,25 @@ import org.apache.eventmesh.runtime.core.protocol.amqp.remoting.protocol.ErrorCo
 import org.apache.eventmesh.runtime.core.protocol.amqp.service.ExchangeService;
 import org.apache.eventmesh.runtime.core.protocol.amqp.service.QueueService;
 import org.apache.eventmesh.runtime.core.protocol.amqp.util.NameUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.StringUtils;
 
 import java.io.UnsupportedEncodingException;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
-import static org.apache.eventmesh.runtime.core.protocol.amqp.remoting.protocol.ErrorCodes.INTERNAL_ERROR;
-import static org.apache.eventmesh.runtime.core.protocol.amqp.remoting.protocol.ErrorCodes.NOT_FOUND;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.impl.AMQCommand;
 
 /**
  * Amqp Channel level method processor.
@@ -98,6 +103,12 @@ public class AmqpChannel implements ChannelMethodProcessor {
 
     private String virtualHostName;
 
+    private Map<Long, PushMessageContext> unackMessageMap = new ConcurrentHashMap<>();
+
+    public Map<Long, PushMessageContext> getUnackMessageMap() {
+        return unackMessageMap;
+    }
+
     public AmqpChannel(int channelId, AmqpConnection connection) {
         this.channelId = channelId;
         this.connection = connection;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org