You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/10/15 15:09:52 UTC
[incubator-eventmesh] branch protocol-amqp updated: realization command factory
This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch protocol-amqp
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/protocol-amqp by this push:
new 76dc4f0f realization command factory
new 3cf61f8a Merge pull request #1610 from wangshaojie4039/protocol-amqp
76dc4f0f is described below
commit 76dc4f0f465387c2b0903bcdf5189ad4afdf72a9
Author: wangshaojie <wa...@cmss.chinamobile.com>
AuthorDate: Sat Oct 15 20:12:53 2022 +0800
realization command factory
---
.../protocol/amqp/remoting/CommandFactory.java | 481 +++++++++++++++++++++
1 file changed, 481 insertions(+)
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/remoting/CommandFactory.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/remoting/CommandFactory.java
new file mode 100644
index 00000000..3f246c9b
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/remoting/CommandFactory.java
@@ -0,0 +1,481 @@
+package org.apache.eventmesh.runtime.core.protocol.amqp.remoting;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.impl.AMQImpl;
+import org.apache.eventmesh.runtime.core.protocol.amqp.remoting.protocol.ProtocolVersion;
+
+import java.util.Map;
+
+public class CommandFactory {
+
+ private ProtocolVersion _protocolVersion;
+
+ public CommandFactory(ProtocolVersion pv) {
+ _protocolVersion = pv;
+ }
+
+ public void setProtocolVersion(final ProtocolVersion protocolVersion) {
+ _protocolVersion = protocolVersion;
+ }
+
+ public final AMQP.Access.Request createAccessRequestBody(final String realm,
+ final boolean exclusive,
+ final boolean passive,
+ final boolean active,
+ final boolean write,
+ final boolean read) {
+ return new AMQImpl.Access.Request.Builder()
+ .realm(realm)
+ .exclusive(exclusive)
+ .passive(passive)
+ .active(active)
+ .write(write)
+ .read(read)
+ .build();
+ }
+
+ public final AMQP.Access.RequestOk createAccessRequestOkBody(final int ticket) {
+ return new AMQImpl.Access.RequestOk.Builder()
+ .ticket(ticket)
+ .build();
+ }
+
+ public final AMQP.Basic.Publish createBasicPublishBody(final int ticket,
+ final String exchange,
+ final String routingKey,
+ final boolean mandatory,
+ final boolean immediate) {
+ return new AMQImpl.Basic.Publish.Builder()
+ .ticket(ticket)
+ .exchange(exchange)
+ .routingKey(routingKey)
+ .mandatory(mandatory)
+ .immediate(immediate)
+ .build();
+ }
+
+ public final AMQP.Basic.Ack createBasicAckBody(final long deliveryTag,
+ final boolean multiple) {
+ return new AMQImpl.Basic.Ack.Builder()
+ .deliveryTag(deliveryTag)
+ .multiple(multiple)
+ .build();
+ }
+
+ public final AMQP.Basic.Nack createBasicNackBody(long deliveryTag,
+ boolean multiple,
+ boolean requeue) {
+ return new AMQImpl.Basic.Nack.Builder()
+ .deliveryTag(deliveryTag)
+ .multiple(multiple)
+ .requeue(requeue)
+ .build();
+ }
+
+ public final AMQP.Basic.RecoverOk createBasicRecoverOkBody() {
+ return new AMQImpl.Basic.RecoverOk();
+ }
+
+ public final AMQP.Basic.QosOk createBasicQosOkBody() {
+ return new AMQImpl.Basic.QosOk();
+ }
+
+ public final AMQP.Basic.Consume createBasicConsumeBody(final int ticket,
+ final String queue,
+ final String consumerTag,
+ final boolean noLocal,
+ final boolean noAck,
+ final boolean exclusive,
+ final boolean nowait,
+ final Map<String, Object> arguments) {
+
+ return new AMQImpl.Basic.Consume.Builder()
+ .ticket(ticket)
+ .queue(queue)
+ .consumerTag(consumerTag)
+ .noLocal(noLocal)
+ .noAck(noAck)
+ .exclusive(exclusive)
+ .nowait(nowait)
+ .arguments(arguments)
+ .build();
+ }
+
+ public final AMQP.Basic.ConsumeOk createBasicConsumeOkBody(final String consumerTag) {
+ return new AMQImpl.Basic.ConsumeOk.Builder()
+ .consumerTag(consumerTag)
+ .build();
+ }
+
+ public final AMQP.Basic.Cancel createBasicCancelBody(final String consumerTag,
+ final boolean nowait) {
+ return new AMQImpl.Basic.Cancel.Builder()
+ .consumerTag(consumerTag)
+ .nowait(nowait)
+ .build();
+ }
+
+ public final AMQP.Basic.CancelOk createBasicCancelOkBody(final String consumerTag) {
+ return new AMQImpl.Basic.CancelOk.Builder()
+ .consumerTag(consumerTag)
+ .build();
+ }
+
+ public final AMQP.Basic.Return createBasicReturnBody(final int replyCode,
+ final String replyText,
+ final String exchange,
+ final String routingKey) {
+ return new AMQImpl.Basic.Return.Builder()
+ .replyCode(replyCode)
+ .replyText(replyText)
+ .exchange(exchange)
+ .routingKey(routingKey)
+ .build();
+ }
+
+ public final AMQP.Basic.Deliver createBasicDeliverBody(final String consumerTag,
+ final long deliveryTag,
+ final boolean redelivered,
+ final String exchange,
+ final String routingKey) {
+ return new AMQImpl.Basic.Deliver.Builder()
+ .consumerTag(consumerTag)
+ .deliveryTag(deliveryTag)
+ .redelivered(redelivered)
+ .exchange(exchange)
+ .routingKey(routingKey)
+ .build();
+ }
+
+ public final AMQP.Basic.Get createBasicGetBody(int ticket, String queue, boolean noAck) {
+ return new AMQImpl.Basic.Get.Builder()
+ .ticket(ticket)
+ .queue(queue)
+ .noAck(noAck)
+ .build();
+ }
+
+ public final AMQP.Basic.GetOk createBasicGetOkBody(final long deliveryTag,
+ final boolean redelivered,
+ final String exchange,
+ final String routingKey,
+ final long messageCount) {
+ return new AMQImpl.Basic.GetOk.Builder()
+ .deliveryTag(deliveryTag)
+ .redelivered(redelivered)
+ .exchange(exchange)
+ .routingKey(routingKey)
+ .messageCount((int) messageCount)
+ .build();
+ }
+
+ public final AMQP.Basic.GetEmpty createBasicGetEmptyBody(final String clusterId) {
+ return new AMQImpl.Basic.GetEmpty.Builder()
+ .clusterId(clusterId)
+ .build();
+ }
+
+ public final AMQP.Channel.OpenOk createChannelOpenOkBody(int channelId) {
+ return new AMQImpl.Channel.OpenOk.Builder()
+ .channelId(Integer.toString(channelId))
+ .build();
+ }
+
+ public final AMQP.Channel.FlowOk createChannelFlowOkBody(final boolean active) {
+ return new AMQImpl.Channel.FlowOk.Builder()
+ .active(active)
+ .build();
+ }
+
+ public final AMQP.Channel.Close createChannelCloseBody(final int replyCode, final String replyText,
+ final int classId,
+ final int methodId
+ ) {
+ return new AMQImpl.Channel.Close.Builder()
+ .replyCode(replyCode)
+ .replyText(replyText)
+ .classId(classId)
+ .methodId(methodId)
+ .build();
+ }
+
+ public final AMQP.Channel.CloseOk createChannelCloseOkBody() {
+ return new AMQImpl.Channel.CloseOk();
+ }
+
+ public final AMQP.Channel.Open createChannelOpenBody(final String outOfBand) {
+ return new AMQImpl.Channel.Open.Builder()
+ .outOfBand(outOfBand)
+ .build();
+ }
+
+ public final AMQP.Connection.Start createConnectionStartBody(final short versionMajor,
+ final short versionMinor,
+ final Map<String, Object> serverProperties,
+ final String mechanisms,
+ final String locales) {
+ return new AMQImpl.Connection.Start.Builder()
+ .versionMajor(versionMajor)
+ .versionMinor(versionMinor)
+ .serverProperties(serverProperties)
+ .mechanisms(mechanisms)
+ .locales(locales)
+ .build();
+ }
+
+ public final AMQP.Connection.Secure createConnectionSecureBody(final String challenge) {
+ return new AMQImpl.Connection.Secure.Builder()
+ .challenge(challenge)
+ .build();
+ }
+
+ public final AMQP.Connection.SecureOk createConnectionSecureOkBody(final String response) {
+ return new AMQImpl.Connection.SecureOk.Builder()
+ .response(response)
+ .build();
+ }
+
+ public final AMQP.Connection.Tune createConnectionTuneBody(final int channelMax,
+ final long frameMax,
+ final int heartbeat) {
+ return new AMQImpl.Connection.Tune.Builder()
+ .channelMax(channelMax)
+ .frameMax((int) frameMax)
+ .heartbeat(heartbeat)
+ .build();
+ }
+
+ public final AMQP.Connection.TuneOk createConnectionTuneOkBody(final int channelMax,
+ final long frameMax,
+ final int heartbeat) {
+ return new AMQImpl.Connection.TuneOk.Builder()
+ .channelMax(channelMax)
+ .frameMax((int) frameMax)
+ .heartbeat(heartbeat)
+ .build();
+ }
+
+ public final AMQP.Connection.Open createConnectionOpenBody(final String virtualHost,
+ final String capabilities,
+ final boolean insist) {
+ return new AMQImpl.Connection.Open.Builder()
+ .virtualHost(virtualHost)
+ .capabilities(capabilities)
+ .insist(insist)
+ .build();
+ }
+
+ public final AMQP.Connection.OpenOk createConnectionOpenOkBody(final String knownHosts) {
+ return new AMQImpl.Connection.OpenOk.Builder()
+ .knownHosts(knownHosts)
+ .build();
+ }
+
+ public final AMQP.Connection.StartOk createConnectionStartOkBody(final Map<String, Object> clientProperties,
+ final String mechanism,
+ final String response,
+ final String locale) {
+ return new AMQImpl.Connection.StartOk.Builder()
+ .clientProperties(clientProperties)
+ .mechanism(mechanism)
+ .response(response)
+ .locale(locale)
+ .build();
+ }
+
+ public final AMQP.Connection.CloseOk createConnectionCloseOkBody() {
+ return new AMQImpl.Connection.CloseOk();
+ }
+
+ public final AMQP.Connection.Close createConnectionCloseBody(final int replyCode,
+ final String replyText,
+ final int classId,
+ final int methodId) {
+ return new AMQImpl.Connection.Close.Builder()
+ .replyCode(replyCode)
+ .replyText(replyText)
+ .classId(classId)
+ .methodId(methodId)
+ .build();
+ }
+
+ public final AMQP.Exchange.Declare createExchangeDeclareBody(final int ticket,
+ final String exchange,
+ final String type,
+ final boolean passive,
+ final boolean durable,
+ final boolean autoDelete,
+ final boolean internal,
+ final boolean nowait,
+ final Map<String, Object> arguments) {
+
+ return new AMQImpl.Exchange.Declare.Builder()
+ .ticket(ticket)
+ .exchange(exchange)
+ .type(type)
+ .passive(passive)
+ .durable(durable)
+ .autoDelete(autoDelete)
+ .internal(internal)
+ .nowait(nowait)
+ .arguments(arguments)
+ .build();
+ }
+
+ public final AMQP.Exchange.DeclareOk createExchangeDeclareOkBody() {
+ return new AMQImpl.Exchange.DeclareOk();
+ }
+
+ public final AMQP.Exchange.Delete createExchangeDeleteBody(final int ticket,
+ final String exchange,
+ final boolean ifUnused,
+ final boolean nowait) {
+ return new AMQImpl.Exchange.Delete.Builder()
+ .ticket(ticket)
+ .exchange(exchange)
+ .ifUnused(ifUnused)
+ .nowait(nowait)
+ .build();
+
+ }
+
+ public final AMQP.Exchange.DeleteOk createExchangeDeleteOkBody() {
+ return new AMQImpl.Exchange.DeleteOk();
+ }
+
+ public final AMQP.Exchange.BindOk createExchangeBindOkBody() {
+ return new AMQImpl.Exchange.BindOk();
+ }
+
+ public final AMQP.Queue.Declare createQueueDeclareBody(final int ticket,
+ final String queue,
+ final boolean passive,
+ final boolean durable,
+ final boolean exclusive,
+ final boolean autoDelete,
+ final boolean nowait,
+ final Map<String, Object> arguments) {
+
+ return new AMQImpl.Queue.Declare.Builder()
+ .ticket(ticket)
+ .queue(queue)
+ .passive(passive)
+ .durable(durable)
+ .exclusive(exclusive)
+ .autoDelete(autoDelete)
+ .nowait(nowait)
+ .arguments(arguments)
+ .build();
+ }
+
+ public final AMQP.Queue.DeclareOk createQueueDeclareOkBody(final String queue,
+ final int messageCount,
+ final int consumerCount) {
+ return new AMQImpl.Queue.DeclareOk.Builder()
+ .queue(queue)
+ .messageCount(messageCount)
+ .consumerCount(consumerCount)
+ .build();
+ }
+
+ public final AMQP.Queue.BindOk createQueueBindOkBody() {
+ return new AMQImpl.Queue.BindOk();
+ }
+
+ public final AMQP.Queue.DeleteOk createQueueDeleteOkBody(final int messageCount) {
+ return new AMQImpl.Queue.DeleteOk.Builder()
+ .messageCount(messageCount)
+ .build();
+ }
+
+ public final AMQP.Queue.Bind createQueueBindBody(final int ticket,
+ final String queue,
+ final String exchange,
+ final String routingKey,
+ final boolean nowait,
+ final Map<String, Object> arguments) {
+
+ return new AMQImpl.Queue.Bind.Builder()
+ .ticket(ticket)
+ .queue(queue)
+ .exchange(exchange)
+ .routingKey(routingKey)
+ .nowait(nowait)
+ .arguments(arguments)
+ .build();
+ }
+
+ public final AMQP.Queue.UnbindOk createQueueUnbindOkBody() {
+ return new AMQImpl.Queue.UnbindOk();
+ }
+
+ public final AMQP.Queue.Unbind createQueueUnbindBody(final int ticket,
+ final String queue,
+ final String exchange,
+ final String routingKey,
+ final Map<String, Object> arguments) {
+ return new AMQP.Queue.Unbind.Builder()
+ .ticket(ticket)
+ .queue(queue)
+ .exchange(exchange)
+ .routingKey(routingKey)
+ .arguments(arguments)
+ .build();
+ }
+
+ public final AMQP.Queue.Purge createQueuePurgeBody(final int ticket,
+ final String queue,
+ final boolean nowait) {
+ return new AMQImpl.Queue.Purge.Builder()
+ .ticket(ticket)
+ .queue(queue)
+ .nowait(nowait)
+ .build();
+ }
+
+ public final AMQP.Queue.PurgeOk createQueuePurgeOkBody(int messageCount) {
+ return new AMQImpl.Queue.PurgeOk.Builder()
+ .messageCount(messageCount)
+ .build();
+ }
+
+ public final AMQP.Queue.Delete createQueueDeleteBody(final int ticket,
+ final String queue,
+ final boolean ifUnused,
+ final boolean ifEmpty,
+ final boolean nowait) {
+ return new AMQImpl.Queue.Delete.Builder()
+ .queue(queue)
+ .ifUnused(ifUnused)
+ .ifEmpty(ifEmpty)
+ .nowait(nowait)
+ .build();
+ }
+
+ public final AMQP.Confirm.SelectOk createConfirmSelectOkBody() {
+ return new AMQImpl.Confirm.SelectOk();
+ }
+
+ public final AMQP.Confirm.Select createConfirmSelectBody(boolean nowait) {
+ return new AMQImpl.Confirm.Select.Builder()
+ .nowait(nowait)
+ .build();
+ }
+
+ public final AMQP.Tx.SelectOk createTxSelectOkBody() {
+ return new AMQImpl.Tx.SelectOk();
+ }
+
+ public final AMQP.Tx.CommitOk createTxCommitOkBody() {
+ return new AMQImpl.Tx.CommitOk();
+ }
+
+ public final AMQP.Tx.RollbackOk createTxRollbackOkBody() {
+ return new AMQImpl.Tx.RollbackOk();
+ }
+
+ public ProtocolVersion getProtocolVersion() {
+ return _protocolVersion;
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org