You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/10/24 02:21:25 UTC
[incubator-eventmesh] branch protocol-amqp updated: define interface of converter for conversion between AmqpMessage and AMQData in input stream and output stream.
This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch protocol-amqp
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/protocol-amqp by this push:
new 90a9e534 define interface of converter for conversion between AmqpMessage and AMQData in input stream and output stream.
new 309a1710 Merge pull request #1791 from jackyluo-learning/protocol-amqp-shadow2
90a9e534 is described below
commit 90a9e5342f98798bd31b0b35d5b935fca96a2beb
Author: jackyluo <15...@qq.com>
AuthorDate: Mon Oct 24 10:07:22 2022 +0800
define interface of converter for conversion between AmqpMessage and AMQData in input stream and output stream.
---
.../protocol/amqp/processor/AmqpConnection.java | 7 ++++++
.../protocol/amqp/util/AmqpInOutputConverter.java | 29 ++++++++++++++++++++++
2 files changed, 36 insertions(+)
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/processor/AmqpConnection.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/processor/AmqpConnection.java
index 223efc08..a3f9e22e 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/processor/AmqpConnection.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/processor/AmqpConnection.java
@@ -30,6 +30,7 @@ import org.apache.eventmesh.runtime.core.protocol.amqp.remoting.codec.AmqpCodeDe
import org.apache.eventmesh.runtime.core.protocol.amqp.remoting.protocol.ErrorCodes;
import org.apache.eventmesh.runtime.core.protocol.amqp.remoting.protocol.ProtocolFrame;
import org.apache.eventmesh.runtime.core.protocol.amqp.remoting.protocol.ProtocolVersion;
+import org.apache.eventmesh.runtime.core.protocol.amqp.util.AmqpInOutputConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -82,6 +83,7 @@ public class AmqpConnection extends AmqpHandler {
private String virtualHostName;
private final Object channelAddRemoveLock = new Object();
private AtomicBoolean blocked = new AtomicBoolean();
+ private AmqpInOutputConverter amqpInOutputConverter;
//private AmqpMessageSender amqpOutputConverter;
@@ -96,6 +98,7 @@ public class AmqpConnection extends AmqpHandler {
this.maxChannels = amqpConfig.maxNoOfChannels;
this.maxFrameSize = amqpConfig.maxFrameSize;
this.heartBeat = amqpConfig.heartBeat;
+ this.amqpInOutputConverter = new AmqpInOutputConverter(this);
//this.amqpOutputConverter = new AmqpMessageSender(this);
}
@@ -622,4 +625,8 @@ public class AmqpConnection extends AmqpHandler {
return ctx.channel().isActive();
}
+ public AmqpInOutputConverter getAmqpInOutputConverter() {
+ return amqpInOutputConverter;
+ }
+
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/util/AmqpInOutputConverter.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/util/AmqpInOutputConverter.java
new file mode 100644
index 00000000..5dd5948f
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/util/AmqpInOutputConverter.java
@@ -0,0 +1,29 @@
+package org.apache.eventmesh.runtime.core.protocol.amqp.util;
+
+import org.apache.eventmesh.common.protocol.amqp.AmqpMessage;
+import org.apache.eventmesh.runtime.core.protocol.amqp.processor.AmqpConnection;
+import org.apache.eventmesh.runtime.core.protocol.amqp.remoting.AMQData;
+
+/**
+ * used to convert input & output in Amqp
+ */
+public class AmqpInOutputConverter {
+
+ private AmqpConnection amqpConnection;
+
+ public AmqpInOutputConverter(AmqpConnection amqpConnection) {
+ this.amqpConnection = amqpConnection;
+ }
+
+ public AMQData convertOutput(AmqpMessage amqpMessage) {
+ AMQData amqData = null;
+ // TODO: 2022/10/21 conversion
+ return amqData;
+ }
+
+ public AmqpMessage convertInput(AMQData amqData) {
+ AmqpMessage amqpMessage = new AmqpMessage();
+ // TODO: 2022/10/21 conversion
+ return amqpMessage;
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org