You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/10/24 02:21:25 UTC

[incubator-eventmesh] branch protocol-amqp updated: define interface of converter for conversion between AmqpMessage and AMQData in input stream and output stream.

This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch protocol-amqp
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/protocol-amqp by this push:
     new 90a9e534 define interface of converter for conversion between AmqpMessage and AMQData in input stream and output stream.
     new 309a1710 Merge pull request #1791 from jackyluo-learning/protocol-amqp-shadow2
90a9e534 is described below

commit 90a9e5342f98798bd31b0b35d5b935fca96a2beb
Author: jackyluo <15...@qq.com>
AuthorDate: Mon Oct 24 10:07:22 2022 +0800

    define interface of converter for conversion between AmqpMessage and AMQData in input stream and output stream.
---
 .../protocol/amqp/processor/AmqpConnection.java    |  7 ++++++
 .../protocol/amqp/util/AmqpInOutputConverter.java  | 29 ++++++++++++++++++++++
 2 files changed, 36 insertions(+)

diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/processor/AmqpConnection.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/processor/AmqpConnection.java
index 223efc08..a3f9e22e 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/processor/AmqpConnection.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/processor/AmqpConnection.java
@@ -30,6 +30,7 @@ import org.apache.eventmesh.runtime.core.protocol.amqp.remoting.codec.AmqpCodeDe
 import org.apache.eventmesh.runtime.core.protocol.amqp.remoting.protocol.ErrorCodes;
 import org.apache.eventmesh.runtime.core.protocol.amqp.remoting.protocol.ProtocolFrame;
 import org.apache.eventmesh.runtime.core.protocol.amqp.remoting.protocol.ProtocolVersion;
+import org.apache.eventmesh.runtime.core.protocol.amqp.util.AmqpInOutputConverter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -82,6 +83,7 @@ public class AmqpConnection extends AmqpHandler {
     private String virtualHostName;
     private final Object channelAddRemoveLock = new Object();
     private AtomicBoolean blocked = new AtomicBoolean();
+    private AmqpInOutputConverter amqpInOutputConverter;
     //private AmqpMessageSender amqpOutputConverter;
 
 
@@ -96,6 +98,7 @@ public class AmqpConnection extends AmqpHandler {
         this.maxChannels = amqpConfig.maxNoOfChannels;
         this.maxFrameSize = amqpConfig.maxFrameSize;
         this.heartBeat = amqpConfig.heartBeat;
+        this.amqpInOutputConverter = new AmqpInOutputConverter(this);
         //this.amqpOutputConverter = new AmqpMessageSender(this);
     }
 
@@ -622,4 +625,8 @@ public class AmqpConnection extends AmqpHandler {
         return ctx.channel().isActive();
     }
 
+    public AmqpInOutputConverter getAmqpInOutputConverter() {
+        return amqpInOutputConverter;
+    }
+
 }
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/util/AmqpInOutputConverter.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/util/AmqpInOutputConverter.java
new file mode 100644
index 00000000..5dd5948f
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/util/AmqpInOutputConverter.java
@@ -0,0 +1,29 @@
+package org.apache.eventmesh.runtime.core.protocol.amqp.util;
+
+import org.apache.eventmesh.common.protocol.amqp.AmqpMessage;
+import org.apache.eventmesh.runtime.core.protocol.amqp.processor.AmqpConnection;
+import org.apache.eventmesh.runtime.core.protocol.amqp.remoting.AMQData;
+
+/**
+ * used to convert input & output in Amqp
+ */
+public class AmqpInOutputConverter {
+
+    private AmqpConnection amqpConnection;
+
+    public AmqpInOutputConverter(AmqpConnection amqpConnection) {
+        this.amqpConnection = amqpConnection;
+    }
+
+    public AMQData convertOutput(AmqpMessage amqpMessage) {
+        AMQData amqData = null;
+        // TODO: 2022/10/21 conversion
+        return amqData;
+    }
+
+    public AmqpMessage convertInput(AMQData amqData) {
+        AmqpMessage amqpMessage = new AmqpMessage();
+        // TODO: 2022/10/21 conversion
+        return amqpMessage;
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org