You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/10/16 02:52:31 UTC
[incubator-eventmesh] branch protocol-amqp updated: Add amqp command process
This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch protocol-amqp
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/protocol-amqp by this push:
new 78fa3d32 Add amqp command process
new 5bb820aa Merge pull request #1612 from wangshaojie4039/protocol-amqp
78fa3d32 is described below
commit 78fa3d325e04f63f803e293503a00ad8902a6747
Author: wangshaojie <wa...@cmss.chinamobile.com>
AuthorDate: Sun Oct 16 08:19:19 2022 +0800
Add amqp command process
---
.../core/protocol/amqp/ExchangeContainer.java | 60 --
.../core/protocol/amqp/ExchangeService.java | 35 -
.../core/protocol/amqp/MetaMessageService.java | 35 -
.../core/protocol/amqp/processor/AmqpChannel.java | 702 +++++++++++++++++----
4 files changed, 572 insertions(+), 260 deletions(-)
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/ExchangeContainer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/ExchangeContainer.java
deleted file mode 100644
index 9e1aaf32..00000000
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/ExchangeContainer.java
+++ /dev/null
@@ -1,60 +0,0 @@
-package org.apache.eventmesh.runtime.core.protocol.amqp;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.eventmesh.runtime.core.protocol.amqp.remoting.metamodels.AmqpExchange;
-import org.apache.eventmesh.runtime.core.protocol.amqp.remoting.metamodels.AmqpExchange;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * manage all exchanges used in server
- */
-public class ExchangeContainer {
-
- private static final Logger logger = LoggerFactory.getLogger(ExchangeContainer.class);
-
- private Map<VirtualHost, Map<String, AmqpExchange>> exchangeMap;
-
- public ExchangeContainer () {
- this.exchangeMap = new ConcurrentHashMap<>();
- }
-
- public AmqpExchange getOrCreateExchange(VirtualHost virtualHost, String exchangeName, String exchangeType, boolean passive, boolean durable, boolean autoDelete, boolean internal) {
- if (StringUtils.isEmpty(exchangeType) && passive) {
- logger.error("[{}][{}] ExchangeType should be set when createIfMissing is true.", virtualHost, exchangeName);
- return null;
- }
- if (virtualHost == null || StringUtils.isEmpty(exchangeName)) {
- logger.error("[{}][{}] Parameter error, namespaceName or exchangeName is empty.", virtualHost, exchangeName);
- return null;
- }
-
- AmqpExchange.Type type = AmqpExchange.Type.value(exchangeType);
- AmqpExchange amqpExchange = new AmqpExchange(exchangeName, type, passive, durable, autoDelete, internal);
-
- this.exchangeMap.putIfAbsent(virtualHost, new ConcurrentHashMap<>());
- AmqpExchange existingAmqpExchange = this.exchangeMap.get(virtualHost).putIfAbsent(exchangeName, amqpExchange);
- if (existingAmqpExchange != null) {
- return existingAmqpExchange;
- } else {
- return amqpExchange;
- }
- }
-
- public void deleteExchange(VirtualHost virtualHost, String exchangeName) {
- if (StringUtils.isEmpty(exchangeName)) {
- return;
- }
- removeExchangeFuture(virtualHost, exchangeName);
- }
-
- private void removeExchangeFuture(VirtualHost virtualHost, String exchangeName) {
- if (exchangeMap.containsKey(virtualHost)) {
- exchangeMap.get(virtualHost).remove(exchangeName);
- }
- }
-}
\ No newline at end of file
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/ExchangeService.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/ExchangeService.java
deleted file mode 100644
index 212efbab..00000000
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/ExchangeService.java
+++ /dev/null
@@ -1,35 +0,0 @@
-package org.apache.eventmesh.runtime.core.protocol.amqp;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.eventmesh.runtime.core.protocol.amqp.remoting.metamodels.AmqpExchange;
-
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-
-/**
- * provide services to use ExchangeContainer
- */
-public class ExchangeService {
- public CompletableFuture<AmqpExchange> exchangeDeclare(VirtualHost virtualHost, String exchange, String type,
- boolean passive, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) {
- if (isDefaultExchange(exchange)) {
- // if default exchange is declaring
-
- } else {
- if (isBuildInExchange(type)) {
- // invoke container to create AmqpExchange
- } else {
- // exception handle
- }
- }
- return null;
- }
-
- public static boolean isDefaultExchange(String exchangeName) {
- return StringUtils.isBlank(exchangeName);
- }
-
- public static boolean isBuildInExchange(String type) {
- return AmqpExchange.Type.value(type) == null;
- }
-}
\ No newline at end of file
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/MetaMessageService.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/MetaMessageService.java
deleted file mode 100644
index 347b5826..00000000
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/MetaMessageService.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eventmesh.runtime.core.protocol.amqp;
-
-import lombok.Getter;
-
-/**
- * a service that manage meta message, including the exchange, queue and vhost related to a single connection
- */
-public class MetaMessageService {
- @Getter
- private ExchangeContainer exchangeContainer;
- @Getter
- private QueueContainer queueContainer;
- @Getter
- private ExchangeService exchangeService;
- @Getter
- private QueueService queueService;
-
-}
\ No newline at end of file
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/processor/AmqpChannel.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/processor/AmqpChannel.java
index 4e954d85..72e4df73 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/processor/AmqpChannel.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/processor/AmqpChannel.java
@@ -1,52 +1,73 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
package org.apache.eventmesh.runtime.core.protocol.amqp.processor;
-import org.apache.eventmesh.runtime.core.protocol.amqp.ExchangeContainer;
-import org.apache.eventmesh.runtime.core.protocol.amqp.ExchangeService;
-import org.apache.eventmesh.runtime.core.protocol.amqp.QueueContainer;
-import org.apache.eventmesh.runtime.core.protocol.amqp.QueueService;
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.impl.AMQCommand;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.eventmesh.runtime.boot.EventMeshAmqpServer;
+import org.apache.eventmesh.runtime.configuration.EventMeshAmqpConfiguration;
+import org.apache.eventmesh.runtime.core.protocol.amqp.exception.AmqpException;
+import org.apache.eventmesh.runtime.core.protocol.amqp.exchange.ExchangeDefaults;
+import org.apache.eventmesh.runtime.core.protocol.amqp.metadata.model.ExchangeInfo;
+import org.apache.eventmesh.runtime.core.protocol.amqp.metadata.model.QueueInfo;
import org.apache.eventmesh.runtime.core.protocol.amqp.remoting.AMQPFrame;
import org.apache.eventmesh.runtime.core.protocol.amqp.remoting.protocol.ErrorCodes;
-
-import org.apache.commons.lang3.StringUtils;
+import org.apache.eventmesh.runtime.core.protocol.amqp.service.ExchangeService;
+import org.apache.eventmesh.runtime.core.protocol.amqp.service.QueueService;
+import org.apache.eventmesh.runtime.core.protocol.amqp.util.NameUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.UnsupportedEncodingException;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static org.apache.eventmesh.runtime.core.protocol.amqp.remoting.protocol.ErrorCodes.INTERNAL_ERROR;
+import static org.apache.eventmesh.runtime.core.protocol.amqp.remoting.protocol.ErrorCodes.NOT_FOUND;
-import com.rabbitmq.client.impl.AMQCommand;
+/**
+ * Amqp Channel level method processor.
+ */
public class AmqpChannel implements ChannelMethodProcessor {
private final Logger log = LoggerFactory.getLogger(this.getClass().getName());
- private int channelId;
- private AmqpConnection connection;
- private long amqpMaxMessageSize;
+ private final int channelId;
+ private final AmqpConnection connection;
+ private final AtomicBoolean blocking = new AtomicBoolean(false);
+ private final AtomicBoolean closing = new AtomicBoolean(false);
+ private AtomicLong confirmedMessageCounter = new AtomicLong(0);
+ private boolean confirmOnPublish;
/**
- * Maps from consumer tag to consumers instance.
+ * A channel has a default queue (the last declared) that is used when no queue name is explicitly set.
*/
- // private final Map<String, Consumer> tag2ConsumersMap = new ConcurrentHashMap<>();
+ private volatile QueueInfo defaultQueue;
+
+ //private final UnacknowledgedMessageMap unacknowledgedMessageMap;
+
+// /**
+// * Maps from consumer tag to consumers instance.
+// */
+// private final Map<String, CompletableFuture<Consumer>> tag2ConsumersMap = new ConcurrentHashMap<>();
+
+ private List<CompletableFuture<Void>> pendingPublishList = new CopyOnWriteArrayList<>();
/**
* The current message - which may be partial in the sense that not all frames have been received yet - which has
@@ -65,118 +86,425 @@ public class AmqpChannel implements ChannelMethodProcessor {
* value of this represents the <b>last</b> tag sent out.
*/
private AtomicLong deliveryTag = new AtomicLong(0);
-
private ExchangeService exchangeService;
private QueueService queueService;
- private ExchangeContainer exchangeContainer;
- private QueueContainer queueContainer;
+ //private AmqpZooKeeperCacheService localZooKeeperCacheService;
+ private EventMeshAmqpConfiguration amqpConfiguration;
+ private EventMeshAmqpServer amqpServer;
+ private AtomicInteger unConfirmedMessageCount = new AtomicInteger(0);
+ private long lastStatTimestamp = System.nanoTime();
+ private final long amqpMaxMessageSize;
+ private static final int StatsPeriodSeconds = 1;
- public AmqpChannel(int channelId, AmqpConnection amqpConnection) {
+ private String virtualHostName;
+
+ public AmqpChannel(int channelId, AmqpConnection connection) {
this.channelId = channelId;
- this.connection = amqpConnection;
- this.exchangeService = this.connection.getAmqpBrokerService().getExchangeService();
- this.queueService = this.connection.getAmqpBrokerService().getQueueService();
- this.exchangeContainer = this.connection.getAmqpBrokerService().getExchangeContainer();
- this.queueContainer = this.connection.getAmqpBrokerService().getQueueContainer();
+ this.connection = connection;
+ this.amqpServer = connection.getAmqpService();
+ this.exchangeService = amqpServer.getExchangeService();
+ this.queueService = amqpServer.getQueueService();
+ this.amqpConfiguration = amqpServer.getEventMeshAmqpConfiguration();
+ this.virtualHostName = connection.getVirtualHostName();
+ this.amqpMaxMessageSize = amqpServer.getEventMeshAmqpConfiguration().maxMessageSize;
}
+
@Override
- public void receiveChannelFlow(boolean active) {
+ public void receiveAccessRequest(String realm, boolean exclusive, boolean passive, boolean active,
+ boolean write, boolean read) {
+ log.info("RECV[{}] AccessRequest[ realm: {}, exclusive: {}, passive: {}, active: {}, write: {}, read: {} ]",
+ getCurrentEnv(), realm, exclusive, passive, active, write, read);
+
+ // We don't implement access control class, but to keep clients happy that expect it always use the "0" ticket.
+ connection.writeMethod(connection.getCommandFactory().createAccessRequestOkBody(0), channelId);
}
@Override
- public void receiveChannelFlowOk(boolean active) {
+ public void receiveExchangeDeclare(String exchange, String type, boolean passive, boolean durable,
+ boolean autoDelete, boolean internal, boolean nowait, Map<String, Object> arguments) {
+ log.info(
+ "RECV[{}] ExchangeDeclare[ exchange: {}, type: {}, passive: {}, durable: {}, autoDelete: {}, internal: {}, nowait: {}, arguments: {} ]",
+ getCurrentEnv(), exchange, type, passive, durable, autoDelete, internal, nowait, arguments);
+ if (StringUtils.isNotBlank(exchange)) {
+ try {
+ NameUtils.checkName(exchange);
+ } catch (IllegalArgumentException e) {
+ log.error("[{}] Exchange Name is illegal:{}", getCurrentEnv(), exchange, e);
+ closeChannel(ErrorCodes.ARGUMENT_INVALID, e.getMessage());
+ return;
+ }
+ }
+ if (passive) {
+ ExchangeInfo exchangeInfo = getExchangeInfo(exchange);
+ if (exchangeInfo == null) {
+ closeChannel(NOT_FOUND, "can not found exchange:" + exchange);
+ } else {
+ if (!nowait) {
+ final AMQP.Exchange.DeclareOk declareOkBody = connection.getCommandFactory().createExchangeDeclareOkBody();
+ connection.writeMethod(declareOkBody, channelId);
+ }
+ }
+ return;
+ }
+
+ try {
+ exchangeService.exchangeDeclare(connection.getVirtualHostName(), exchange, type, durable,
+ autoDelete, internal, arguments);
+ log.info("[{}] {}exchangeDeclare success ", getCurrentEnv(), exchange);
+ if (!nowait) {
+ final AMQP.Exchange.DeclareOk declareOkBody = connection.getCommandFactory().createExchangeDeclareOkBody();
+ connection.writeMethod(declareOkBody, channelId);
+ }
+ } catch (AmqpException e) {
+ log.info("[{}] {}exchangeDeclare failed ", getCurrentEnv(), exchange, e);
+ processChannelCommandException(e);
+ }
}
@Override
- public void receiveChannelClose(int replyCode, String replyText, int classId, int methodId) {
+ public void receiveExchangeDelete(String exchange, boolean ifUnused, boolean nowait) {
+ log.info("RECV[{}] ExchangeDelete[ exchange: {}, ifUnused: {}, nowait:{} ]", getCurrentEnv(), exchange,
+ ifUnused, nowait);
+
+ try {
+ exchangeService.exchangeDelete(connection.getVirtualHostName(), exchange, ifUnused);
+ log.info("[{}] {} ExchangeDelete success ", getCurrentEnv(), exchange);
+ if (!nowait) {
+ AMQP.Exchange.DeleteOk deleteOk = connection.getCommandFactory().createExchangeDeleteOkBody();
+ connection.writeMethod(deleteOk, channelId);
+ }
+ } catch (AmqpException e) {
+ log.info("[{}] {} ExchangeDelete failed ", getCurrentEnv(), exchange, e);
+ processChannelCommandException(e);
+ }
+ }
+ @Override
+ public void receiveExchangeBound(String exchange, String routingKey, String queueName) {
+
+ connection.writeMethod(connection.getCommandFactory().createExchangeBindOkBody(), channelId);
+
+ }
+
+ @Override
+ public void receiveQueueDeclare(String queue, boolean passive, boolean durable, boolean exclusive,
+ boolean autoDelete, boolean nowait, Map<String, Object> arguments) {
+ log.info("RECV[{}] QueueDeclare[ queue: {}, passive: {}, durable:{}, "
+ + "exclusive:{}, autoDelete:{}, nowait:{}, arguments:{} ]",
+ getCurrentEnv(), queue, passive, durable, exclusive, autoDelete, nowait, arguments);
+ if (StringUtils.isNotBlank(queue)) {
+ try {
+ NameUtils.checkName(queue);
+ } catch (IllegalArgumentException e) {
+ log.error("Name is illegal:{}", queue, e);
+ closeChannel(ErrorCodes.ARGUMENT_INVALID, e.getMessage());
+ return;
+ }
+ }
+
+ QueueInfo queueInfo = getQueueInfo(queue);
+
+ if (checkExclusiveQueue(queueInfo, connection.getConnectionId())) {
+ log.error("checkExclusiveQueue failed {},{}", getCurrentEnv(), queueInfo);
+ closeChannel(ErrorCodes.ALREADY_EXISTS, "Exclusive queue can not be used from other connection, queueName:"
+ + queueInfo.getQueueName());
+ return;
+ }
+
+ if (passive) {
+ if (queueInfo == null) {
+ closeChannel(NOT_FOUND, "can not found queue:" + queue);
+ } else {
+ if (!nowait) {
+ connection.writeMethod(connection.getCommandFactory().createQueueDeclareOkBody(queueInfo.getQueueName(),
+ 0, 0), channelId);
+ }
+ }
+ return;
+ }
+
+
+ try {
+ QueueInfo q = queueService.queueDeclare(connection.getConnectionId(), connection.getVirtualHostName(), queue,
+ durable, exclusive, autoDelete, arguments);
+ log.info("[{}] {} queueDeclare success", getCurrentEnv(), queue);
+ setDefaultQueue(q);
+ if (!nowait) {
+ connection.writeMethod(connection.getCommandFactory().createQueueDeclareOkBody(q.getQueueName(),
+ 0, 0), channelId);
+ }
+ } catch (AmqpException e) {
+ log.info("[{}] {} queueDeclare failed", getCurrentEnv(), queue, e);
+ processChannelCommandException(e);
+ }
+ }
+
+ @Override
+ public void receiveQueueBind(String queue, String exchange, String bindingKey,
+ boolean nowait, Map<String, Object> argumentsTable) {
+ log.info("RECV[{}] QueueBind[ queue: {}, exchange: {}, bindingKey:{}, nowait:{}, arguments:{} ]",
+ getCurrentEnv(), queue, exchange, bindingKey, nowait, argumentsTable);
+
+ if (StringUtils.isNotBlank(bindingKey)) {
+ try {
+ NameUtils.checkName(bindingKey);
+ } catch (IllegalArgumentException e) {
+ log.error("bindingKey is illegal:{} {}", queue, bindingKey, e);
+ closeChannel(ErrorCodes.ARGUMENT_INVALID, e.getMessage());
+ return;
+ }
+ }
+
+ if (checkExclusiveQueue(queue, connection.getConnectionId())) {
+ return;
+ }
+
+ try {
+ queueService.queueBind(connection.getVirtualHostName(), getDefaultQueue(), queue, exchange,
+ bindingKey, argumentsTable);
+ log.info("[{}] Success to bind exchange:{} to queue:{}", getCurrentEnv(), exchange, queue);
+ if (!nowait) {
+ connection.writeMethod(connection.getCommandFactory().createQueueBindOkBody(), channelId);
+ }
+ } catch (AmqpException e) {
+ log.info("[{}] failed to bind exchange:{} to queue:{}", getCurrentEnv(), exchange, queue, e);
+ processChannelCommandException(e);
+ }
+ }
+
+ @Override
+ public void receiveQueuePurge(String queue, boolean nowait) {
+ log.info("RECV[{}] QueuePurge[ queue: {}, nowait:{} ]", getCurrentEnv(), queue, nowait);
+ if (checkExclusiveQueue(queue, connection.getConnectionId())) {
+ return;
+ }
+
+ queueService.queuePurge(connection.getVirtualHostName(), queue);
+ log.info("[{}] success to purge queue:{}", getCurrentEnv(), queue);
+ if (!nowait) {
+ connection.writeMethod(connection.getCommandFactory().createQueuePurgeOkBody(0), channelId);
+ }
+
+ }
+
+ @Override
+ public void receiveQueueDelete(String queue, boolean ifUnused, boolean ifEmpty, boolean nowait) {
+ log.info("RECV[{}] QueueDelete[ queue: {}, ifUnused:{}, ifEmpty:{}, nowait:{} ]", getCurrentEnv(), queue,
+ ifUnused, ifEmpty, nowait);
+
+ if (checkExclusiveQueue(queue, connection.getConnectionId())) {
+ return;
+ }
+
+ try {
+ queueService.queueDelete(connection.getVirtualHostName(), getDefaultQueue(), queue, ifUnused, ifEmpty);
+ log.info("[{}] success to delete queue:{}", getCurrentEnv(), queue);
+ if (!nowait) {
+ connection.writeMethod(connection.getCommandFactory().createQueueDeleteOkBody(0), channelId);
+ }
+ } catch (AmqpException e) {
+ log.error("[{}] Failed to delete queue:{}", getCurrentEnv(), queue, e);
+ processChannelCommandException(e);
+ }
+ }
+
+ @Override
+ public void receiveQueueUnbind(String queue, String exchange, String bindingKey,
+ Map<String, Object> arguments) {
+ log.info("RECV[{}] QueueUnbind[ queue: {}, exchange:{}, bindingKey:{}, arguments:{} ]", getCurrentEnv(), queue,
+ exchange, bindingKey, arguments);
+ if (checkExclusiveQueue(queue, connection.getConnectionId())) {
+ return;
+ }
+
+ try {
+ queueService.queueUnbind(connection.getVirtualHostName(), queue, exchange, bindingKey, arguments);
+ log.info("[{}] success to unbind queue:{}, exchange:{}", getCurrentEnv(), queue, exchange);
+ connection.writeMethod(connection.getCommandFactory().createQueueUnbindOkBody(), channelId);
+ } catch (AmqpException e) {
+ log.error("[{}] Failed to unbind queue:{}, exchange:{}", getCurrentEnv(), queue, exchange, e);
+ processChannelCommandException(e);
+ }
+
+ }
+
+ @Override
+ public void receiveBasicQos(long prefetchSize, int prefetchCount, boolean global) {
+ log.info("RECV[{}] BasicQos[prefetchSize: {} prefetchCount: {} global: {}]",
+ getCurrentEnv(), prefetchSize, prefetchCount, global);
+ if (prefetchSize > 0) {
+ closeChannel(ErrorCodes.NOT_IMPLEMENTED, "prefetchSize not supported ");
+ return;
+ }
+ //creditManager.setCreditLimits(0, prefetchCount);
+ connection.writeMethod(connection.getCommandFactory().createBasicQosOkBody(), channelId);
+ }
+
+ @Override
+ public void receiveBasicConsume(String queue, String consumerTag,
+ boolean noLocal, boolean noAck, boolean exclusive,
+ boolean nowait, Map<String, Object> arguments) {
+
+ }
+
+ @Override
+ public void receiveBasicCancel(String consumerTag, boolean noWait) {
+
+ }
+
+ private void setPublishFrame(AMQP.Basic.Publish publishFrame) {
+ currentMessage = new AMQCommand(publishFrame);
+ }
+
+ @Override
+ public void receiveBasicGet(String queue, boolean noAck) {
+
+
+ }
+
+
+ @Override
+ public void receiveChannelFlow(boolean active) {
+ if (log.isDebugEnabled()) {
+ log.debug("RECV[{}] ChannelFlow[active: {}]", channelId, active);
+ }
+ // TODO channelFlow process
+ connection.writeMethod(connection.getCommandFactory().createChannelFlowOkBody(true), channelId);
+ }
+
+ @Override
+ public void receiveChannelFlowOk(boolean active) {
+ if (log.isDebugEnabled()) {
+ log.debug("RECV[{}] ChannelFlowOk[active: {}]", channelId, active);
+ }
+ }
+
+ @Override
+ public void receiveChannelClose(int replyCode, String replyText, int classId, int methodId) {
+ log.info("RECV[{}] ChannelClose[replyCode: {} replyText: {} classId: {} methodId: {}",
+ getCurrentEnv(), replyCode, replyText, classId, methodId);
+ // TODO Process outstanding client requests
+ processAsync();
+ connection.closeChannel(this);
+ connection.writeMethod(connection.getCommandFactory().createChannelCloseOkBody(), channelId);
}
@Override
public void receiveChannelCloseOk() {
+ if (log.isDebugEnabled()) {
+ log.debug("RECV[ {} ] ChannelCloseOk", channelId);
+ }
+ connection.closeChannelOk(getChannelId());
+ }
+
+ private boolean hasCurrentMessage() {
+ return currentMessage != null;
}
@Override
- public void receiveMessageContent(AMQPFrame data) {
+ public void receiveBasicPublish(String exchange, String routingKey, boolean mandatory,
+ boolean immediate) {
+ if (log.isDebugEnabled()) {
+ log.debug("RECV[{}] BasicPublish[exchange: {} routingKey: {} mandatory: {} immediate: {}]",
+ getCurrentEnv(), exchange, routingKey, mandatory, immediate);
+ }
+ String exchangeName;
+ if (isDefaultExchange(exchange)) {
+ exchangeName = ExchangeDefaults.DEFAULT_EXCHANGE_NAME_DURABLE;
+ } else {
+ exchangeName = exchange;
+ }
+
+// if (getExchangeInfo(exchangeName) == null) {
+// closeChannel(NOT_FOUND, "exchange:" + exchangeName + " not find");
+// return;
+// }
+
+ setPublishFrame(connection.getCommandFactory().createBasicPublishBody(0, exchangeName, routingKey, mandatory, immediate));
+ }
+
+ @Override
+ public void receiveMessageContent(AMQPFrame frame) {
if (log.isDebugEnabled()) {
try {
- log.debug("receive messageContent[data:{}]", new String(data.getData(), "UTF8"));
+ log.debug("RECV[{}] MessageContent[data:{}]", getCurrentEnv(), new String(frame.getData(), "UTF8"));
} catch (UnsupportedEncodingException e) {
- log.error("Failed to encode data:{}", e);
+ log.error("Failed to encode data:{}", getCurrentEnv(), e);
}
}
if (hasCurrentMessage()) {
- // TODO: 2022/9/20 send message to mq
// try {
-// if (currentMessage.handleFrame(data)) {
-// processSendMessage();
-// }
+//// if (currentMessage.handleFrame(frame)) {
+//// deliverCurrentMessageIfComplete();
+//// }
// } catch (IOException e) {
// log.error("receiveMessageContent exception {}", e);
// closeChannel(ErrorCodes.COMMAND_INVALID,
-// "Attempt to send a content not valid");
+// "Attempt to send a content not valid");
// } catch (Exception e) {
// log.error("receiveMessageContent exception {}", e);
// closeChannel(ErrorCodes.SYNTAX_ERROR,
-// "system error");
+// "system error");
// }
} else {
closeChannel(ErrorCodes.COMMAND_INVALID,
- "Attempt to send a content without first sending a publish frame");
+ "Attempt to send a content without first sending a publish frame");
}
}
@Override
public void receiveMessageHeader(AMQPFrame frame) {
if (log.isDebugEnabled()) {
- log.debug("recv[{}] message header[{}]", channelId, frame);
+ log.debug("RECV[{}] MessageHeader[{}]", channelId, frame);
}
if (hasCurrentMessage()) {
- // TODO: 2022/9/20 send message to mq
// try {
-// if (currentMessage.handleFrame(frame)) {
-// processSendMessage();
-// }
+//// if (currentMessage.handleFrame(frame)) {
+//// deliverCurrentMessageIfComplete();
+//// }
// } catch (IOException e) {
// log.error("receiveMessageHeader exce {}", e.getMessage());
// closeChannel(ErrorCodes.COMMAND_INVALID,
-// "Attempt to send a content not valid");
+// "Attempt to send a content not valid");
// } catch (Exception e) {
// log.error("receiveMessageHeader exce {}", e.getMessage());
// closeChannel(ErrorCodes.SYNTAX_ERROR,
-// "system error");
+// "system error");
// }
long bodySize = currentMessage.getContentHeader().getBodySize();
if (bodySize > amqpMaxMessageSize) {
log.error("RECV[{}] too large message bodySize {}", channelId, bodySize);
closeChannel(ErrorCodes.MESSAGE_TOO_LARGE,
- "Message size of " + bodySize + " greater than allowed maximum of " + amqpMaxMessageSize);
+ "Message size of " + bodySize + " greater than allowed maximum of " + amqpMaxMessageSize);
}
} else {
closeChannel(ErrorCodes.COMMAND_INVALID,
- "Attempt to send a content without first sending a publish frame");
+ "Attempt to send a content without first sending a publish frame");
}
}
- /**
- * send message
- */
- private void processSendMessage(){
- // TODO: 2022/9/20 convert to cloudEvent and send to mq
-
+ private void deliverCurrentMessageIfComplete() throws Exception {
}
+ private boolean putMsgCheck(Set<String> queues) {
+ for (String queue : queues) {
+ if (!checkQueueExist(queue)) {
+ return false;
+ }
+ }
+ return true;
+ }
@Override
@@ -189,140 +517,254 @@ public class AmqpChannel implements ChannelMethodProcessor {
}
- @Override
- public void receiveBasicAck(long deliveryTag, boolean multiple) {
-
- }
@Override
- public void receiveAccessRequest(String realm, boolean exclusive, boolean passive, boolean active, boolean write, boolean read) {
+ public void receiveBasicRecover(boolean requeue, boolean sync) {
}
- @Override
- public void receiveExchangeDeclare(String exchange, String type, boolean passive, boolean durable, boolean autoDelete, boolean internal, boolean nowait, Map<String, Object> arguments) {
-
- }
@Override
- public void receiveExchangeDelete(String exchange, boolean ifUnused, boolean nowait) {
+ public void receiveBasicAck(long deliveryTag, boolean multiple) {
}
+
@Override
- public void receiveExchangeBound(String exchange, String routingKey, String queue) {
+ public void receiveBasicReject(long deliveryTag, boolean requeue) {
}
@Override
- public void receiveQueueDeclare(String queue, boolean passive, boolean durable, boolean exclusive, boolean autoDelete, boolean nowait, Map<String, Object> arguments) {
-
+ public void receiveTxSelect() {
+ if (log.isDebugEnabled()) {
+ log.debug("RECV[{}] TxSelect", getCurrentEnv());
+ }
+ // TODO txSelect process
+ connection.writeMethod(connection.getCommandFactory().createTxSelectOkBody(), channelId);
}
@Override
- public void receiveQueueBind(String queue, String exchange, String bindingKey, boolean nowait, Map<String, Object> arguments) {
-
+ public void receiveTxCommit() {
+ if (log.isDebugEnabled()) {
+ log.debug("RECV[{}] TxCommit", getCurrentEnv());
+ }
+ // TODO txCommit process
+ connection.writeMethod(connection.getCommandFactory().createTxCommitOkBody(), channelId);
}
@Override
- public void receiveQueuePurge(String queue, boolean nowait) {
-
+ public void receiveTxRollback() {
+ if (log.isDebugEnabled()) {
+ log.debug("RECV[{}] TxRollback", getCurrentEnv());
+ }
+ // TODO txRollback process
+ connection.writeMethod(connection.getCommandFactory().createTxRollbackOkBody(), channelId);
}
@Override
- public void receiveQueueDelete(String queue, boolean ifUnused, boolean ifEmpty, boolean nowait) {
+ public void receiveConfirmSelect(boolean nowait) {
+ if (log.isDebugEnabled()) {
+ log.debug("RECV[{}] ConfirmSelect [ nowait: {} ]", getCurrentEnv(), nowait);
+ }
+ confirmOnPublish = true;
+ if (!nowait) {
+ connection.writeMethod(connection.getCommandFactory().createConfirmSelectOkBody(), channelId);
+ }
}
- @Override
- public void receiveQueueUnbind(String queue, String exchange, String bindingKey, Map<String, Object> arguments) {
-
+ public void receivedComplete() {
+ processAsync();
}
- @Override
- public void receiveBasicRecover(boolean requeue, boolean sync) {
+ private void sendChannelClose(int cause, final String message) {
+ connection.closeChannelAndWriteFrame(this, cause, message);
+ }
+ public void processAsync() {
+// if (!pendingPublishList.isEmpty()) {
+// log.info("[{}] closing messageStore, and pendingPublishList size is:{}", getCurrentEnv(), pendingPublishList.size());
+// FutureUtil.waitForAll(pendingPublishList).whenComplete((aVoid, ex) -> {
+// if (ex != null) {
+// log.error("[{}] Failed to complete publish.", getCurrentEnv(), ex);
+// } else {
+// log.info("[{}] pendingPublishList had all done.", getCurrentEnv());
+// pendingPublishList.clear();
+// }
+// });
+// }
}
- @Override
- public void receiveBasicQos(long prefetchSize, int prefetchCount, boolean global) {
+ public void close() {
+ // TODO
+ if (!closing.compareAndSet(false, true)) {
+ //Channel is already closing
+ return;
+ }
+// unsubscribeConsumerAll();
+ // TODO need to delete exclusive queues in this channel.
+ setDefaultQueue(null);
}
- @Override
- public void receiveBasicConsume(String queue, String consumerTag, boolean noLocal, boolean noAck, boolean exclusive, boolean nowait, Map<String, Object> arguments) {
- log.info("RECV BasicConsume[queue:{} consumerTag:{} noLocal:{} noAck:{} exclusive:{} nowait:{}"
- + " arguments:{}]", queue, consumerTag, noLocal, noAck, exclusive, nowait, arguments);
+ public synchronized void block() {
+ // TODO
+ }
- // TODO check queue exist
+ public synchronized void unblock() {
+ // TODO
+ }
- final String consumerTag1;
- if (StringUtils.isBlank(consumerTag)) {
- consumerTag1 = "consumerTag_" + getNextConsumerTag();
- } else {
- consumerTag1 = consumerTag;
- }
+ public int getChannelId() {
+ return channelId;
+ }
- // tag2ConsumersMap.computeIfAbsent(consumerTag1, (c) ->new ConsumerImpl());
- if (!nowait) {
- //connection.writeMethod(connection.getCommandFactory().createBasicConsumeOkBody(consumer.getConsumerTag()), channelId);
- }
+ public boolean isClosing() {
+ return closing.get() || connection.isClosing();
}
- @Override
- public void receiveBasicCancel(String consumerTag, boolean noWait) {
+ private boolean isDefaultExchange(final String exchangeName) {
+ return StringUtils.isBlank(exchangeName);
+ }
+ public void closeChannel(int cause, final String message) {
+// if (closing.get()) {
+// if (log.isDebugEnabled()) {
+// log.debug("Channel is already closing id {} ", channelId);
+// }
+// return;
+// }
+ connection.closeChannelAndWriteFrame(this, cause, message);
}
- @Override
- public void receiveBasicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate) {
+ public long getNextDeliveryTag() {
+ return deliveryTag.incrementAndGet();
+ }
+ private int getNextConsumerTag() {
+ return consumerTag.incrementAndGet();
}
- @Override
- public void receiveBasicGet(String queue, boolean noAck) {
+ private long getNextConfirmedCounter() {
+ return confirmedMessageCounter.incrementAndGet();
+ }
+ public AmqpConnection getConnection() {
+ return connection;
}
- @Override
- public void receiveBasicReject(long deliveryTag, boolean requeue) {
+ protected void setDefaultQueue(QueueInfo queue) {
+ defaultQueue = queue;
}
- @Override
- public void receiveTxSelect() {
+ protected QueueInfo getDefaultQueue() {
+ return defaultQueue;
+ }
+ public String getExchangeType(String exchangeName) {
+ if (StringUtils.isBlank(exchangeName)) {
+ exchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
+ }
+ switch (exchangeName) {
+ case ExchangeDefaults.DIRECT_EXCHANGE_NAME:
+ return ExchangeDefaults.DIRECT_EXCHANGE_CLASS;
+ case ExchangeDefaults.FANOUT_EXCHANGE_NAME:
+ return ExchangeDefaults.FANOUT_EXCHANGE_CLASS;
+ case ExchangeDefaults.TOPIC_EXCHANGE_NAME:
+ return ExchangeDefaults.TOPIC_EXCHANGE_CLASS;
+ default:
+ return "";
+ }
}
- @Override
- public void receiveTxCommit() {
+ private boolean checkExclusiveQueue(String queue, String connectionId) {
+ if (StringUtils.isBlank(queue)) {
+ return false;
+ }
+ AtomicBoolean isExclusive = new AtomicBoolean(false);
+ QueueInfo amqpQueue = getQueueInfo(queue);
+
+ if (amqpQueue != null && amqpQueue.isExclusive()
+ && !amqpQueue.getConnectionId().equals(connectionId)) {
+ isExclusive.set(true);
+ String message = "Exclusive queue can not be used from other connection, queueName:"
+ + amqpQueue.getQueueName();
+ log.error("{},{}", getCurrentEnv(), message);
+ closeChannel(ErrorCodes.ALREADY_EXISTS, message);
+ }
+ return isExclusive.get();
}
- @Override
- public void receiveTxRollback() {
+ private boolean checkExclusiveQueue(QueueInfo queueInfo, String connectionId) {
+ if (queueInfo != null && queueInfo.isExclusive()
+ && !queueInfo.getConnectionId().equals(connectionId)) {
+ return true;
+ }
+ return false;
}
- @Override
- public void receiveConfirmSelect(boolean nowait) {
+ private boolean checkQueueExist(String queueName) {
+ try {
+ return queueService.checkExist(virtualHostName, queueName);
+ } catch (Exception e) {
+ log.error("amqpChannel checkQueueExist error {} ", getCurrentEnv(), e);
+ return false;
+ }
+ }
+ private boolean checkExchangeExist(String exchangeName) {
+ try {
+ return exchangeService.checkExchangeExist(virtualHostName, exchangeName);
+ } catch (Exception e) {
+ log.error("amqpChannel checkExchangeExist error {} ", getCurrentEnv(), e);
+ return false;
+ }
}
- public void closeChannel(int cause, final String message) {
- // TODO
+ private QueueInfo getQueueInfo(String queueName) {
+ try {
+ return queueService.getQueue(virtualHostName, queueName);
+ } catch (Exception e) {
+ log.error("amqpChannel getQueueInfo error {} ", getCurrentEnv(), e);
+ return null;
+ }
}
+ private ExchangeInfo getExchangeInfo(String exchangeName) {
+ try {
+ return exchangeService.getExchange(virtualHostName, exchangeName);
+ } catch (Exception e) {
+ log.error("amqpChannel getQueueInfo error {} ", getCurrentEnv(), e);
+ return null;
+ }
+ }
- private boolean hasCurrentMessage() {
- return currentMessage != null;
+ private String getConsumerName(String tag) {
+ return getConnection().getConnectionId() + "-" + channelId + "-" + tag;
}
- public long getNextDeliveryTag() {
- return deliveryTag.incrementAndGet();
+ private String getCurrentEnv() {
+ StringBuffer sb = new StringBuffer();
+ sb.append("_amqp_env_>>");
+ sb.append(connection.getConnectionId()).append("/").
+ append(channelId).append("/").append(virtualHostName);
+ return sb.toString();
}
- private int getNextConsumerTag() {
- return consumerTag.incrementAndGet();
+
+ private void processChannelCommandException(Throwable throwable) {
+ Throwable exce = throwable.getCause() == null ? throwable : throwable.getCause();
+ if (exce instanceof AmqpException) {
+ AmqpException ex = (AmqpException) exce;
+ log.error("processChannelCommandException errorCode: {} errorMessage {}", ex.getErrorCode(), ex.getMessage(), ex);
+ closeChannel(ex.getErrorCode(), ex.getMessage());
+ } else {
+ log.error("processChannelCommandException ", exce);
+ closeChannel(INTERNAL_ERROR, exce.getMessage());
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org