You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/10/16 05:35:56 UTC
[incubator-eventmesh] branch protocol-amqp updated: modify amqp connection command process
This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch protocol-amqp
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/protocol-amqp by this push:
new 0a327956 modify amqp connection command process
new e1c07a77 Merge pull request #1615 from wangshaojie4039/protocol-amqp
0a327956 is described below
commit 0a32795648fb28aecdf74832271297ec4b3a5775
Author: wangshaojie <wa...@cmss.chinamobile.com>
AuthorDate: Sun Oct 16 13:11:51 2022 +0800
modify amqp connection command process
---
.../protocol/amqp/processor/AmqpConnection.java | 686 ++++++++++++++-------
1 file changed, 473 insertions(+), 213 deletions(-)
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/processor/AmqpConnection.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/processor/AmqpConnection.java
index a894a1fb..223efc08 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/processor/AmqpConnection.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/processor/AmqpConnection.java
@@ -1,58 +1,60 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
package org.apache.eventmesh.runtime.core.protocol.amqp.processor;
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.LongString;
+import com.rabbitmq.client.impl.AMQImpl;
+import com.rabbitmq.client.impl.Frame;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.handler.timeout.IdleStateHandler;
+import lombok.Getter;
import org.apache.eventmesh.runtime.boot.EventMeshAmqpServer;
-import org.apache.eventmesh.runtime.core.protocol.amqp.MetaMessageService;
-import org.apache.eventmesh.runtime.core.protocol.amqp.VirtualHost;
+import org.apache.eventmesh.runtime.configuration.EventMeshAmqpConfiguration;
+import org.apache.eventmesh.runtime.core.protocol.amqp.remoting.AMQData;
import org.apache.eventmesh.runtime.core.protocol.amqp.remoting.AMQPFrame;
-import org.apache.eventmesh.runtime.core.protocol.amqp.remoting.constants.ProtocolKey;
+import org.apache.eventmesh.runtime.core.protocol.amqp.remoting.CommandFactory;
+import org.apache.eventmesh.runtime.core.protocol.amqp.remoting.codec.AmqpCodeDecoder;
import org.apache.eventmesh.runtime.core.protocol.amqp.remoting.protocol.ErrorCodes;
import org.apache.eventmesh.runtime.core.protocol.amqp.remoting.protocol.ProtocolFrame;
import org.apache.eventmesh.runtime.core.protocol.amqp.remoting.protocol.ProtocolVersion;
-import org.apache.eventmesh.runtime.util.AmqpUtils;
-
-import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+/**
+ * Amqp server level method processor.
+ */
+public class AmqpConnection extends AmqpHandler {
-import io.netty.channel.ChannelDuplexHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.timeout.IdleState;
-import io.netty.handler.timeout.IdleStateEvent;
-import io.netty.handler.timeout.IdleStateHandler;
-import io.netty.util.Attribute;
-import com.rabbitmq.client.AMQP;
-import com.rabbitmq.client.LongString;
-import com.rabbitmq.client.impl.AMQImpl;
-import com.rabbitmq.client.impl.Frame;
+ private final Logger log = LoggerFactory.getLogger(this.getClass().getName());
-public class AmqpConnection extends AmqpHandler {
- private static final Logger logger = LoggerFactory.getLogger(AmqpConnection.class);
+ public static final String DEFAULT_VIRTUALHOST = "/";
- enum ConnectionState {
+ public enum ConnectionState {
INIT,
AWAIT_START_OK,
AWAIT_SECURE_OK,
@@ -61,305 +63,563 @@ public class AmqpConnection extends AmqpHandler {
OPEN
}
- private volatile ConnectionState state;
-
+ private String connectionId;
+ @Getter
+ private final ConcurrentHashMap<Integer, AmqpChannel> channels;
+ private final ConcurrentHashMap<Integer, Long> closingChannelsList;
+ @Getter
+ private final EventMeshAmqpConfiguration amqpConfig;
private ProtocolVersion protocolVersion;
+ private CommandFactory commandFactory;
+ private volatile ConnectionState state = ConnectionState.INIT;
+ private volatile int currentClassId;
+ private volatile int currentMethodId;
+ @Getter
+ private final AtomicBoolean orderlyClose = new AtomicBoolean(false);
+ private volatile int maxChannels;
+ private volatile int maxFrameSize;
+ private volatile int heartBeat;
+ private String virtualHostName;
+ private final Object channelAddRemoveLock = new Object();
+ private AtomicBoolean blocked = new AtomicBoolean();
+ //private AmqpMessageSender amqpOutputConverter;
+
+
+ public AmqpConnection(EventMeshAmqpServer amqpBrokerService) {
+ super(amqpBrokerService);
+ this.connectionId = UUID.randomUUID().toString();
+ this.channels = new ConcurrentHashMap<>();
+ this.closingChannelsList = new ConcurrentHashMap<>();
+ this.protocolVersion = ProtocolVersion.v0_91;
+ this.commandFactory = new CommandFactory(this.protocolVersion);
+ this.amqpConfig = amqpBrokerService.getEventMeshAmqpConfiguration();
+ this.maxChannels = amqpConfig.maxNoOfChannels;
+ this.maxFrameSize = amqpConfig.maxFrameSize;
+ this.heartBeat = amqpConfig.heartBeat;
+ //this.amqpOutputConverter = new AmqpMessageSender(this);
+ }
- private int heartBeatDelay;
-
- private int maxFrameSize;
-
- private int maxChannelNo;
-
- private VirtualHost vhost;
-
- private MetaMessageService metaMessageService;
-
- private ConcurrentHashMap<Integer, AmqpChannel> channels;
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ super.channelActive(ctx);
+ this.remoteAddress = ctx.channel().remoteAddress();
+ this.ctx = ctx;
+ isActive.set(true);
+ }
- private ConcurrentHashMap<Integer, AmqpChannel> closingChannelsList;
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ super.channelInactive(ctx);
+ completeAndCloseAllChannels();
+ //amqpBrokerService.getConnectionContainer().removeConnection(virtualHostName, this);
+ }
- public AmqpConnection(EventMeshAmqpServer amqpServer) {
- super(amqpServer);
- this.channels = new ConcurrentHashMap<>();
- this.closingChannelsList = new ConcurrentHashMap<>();
- this.state = ConnectionState.INIT;
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ log.error("[{}] Got exception: {}", remoteAddress, cause.getMessage(), cause);
+ close();
}
@Override
public void close() {
-
+ if (isActive.getAndSet(false)) {
+ log.info("close netty channel {}", ctx.channel());
+ ctx.close();
+ }
}
@Override
- public void receiveConnectionStartOk(Map<String, Object> clientProperties, String mechanism, LongString response, String locale) {
- if (logger.isDebugEnabled()) {
- logger.debug("RECV ConnectionStartOk[clientProperties: {}, mechanism: {}, locale: {}]",
+ public void receiveConnectionStartOk(Map<String, Object> clientProperties, String mechanism, LongString response,
+ String locale) {
+ if (log.isDebugEnabled()) {
+ log.debug("RECV ConnectionStartOk[clientProperties: {}, mechanism: {}, locale: {}]",
clientProperties, mechanism, locale);
}
assertState(ConnectionState.AWAIT_START_OK);
- if (StringUtils.isBlank(mechanism)) {
- AmqpUtils.sendConnectionClose(ErrorCodes.CONNECTION_FORCED, "No mechanism provided", 0);
- return;
- }
- if (response == null || response.length() == 0) {
- AmqpUtils.sendConnectionClose(ErrorCodes.CONNECTION_FORCED, "No authentication data provided", 0);
- return;
- }
- // TODO: 2022/9/20 Authentication Mechanism set up
-
- AMQImpl.Connection.Tune tune = (AMQImpl.Connection.Tune) new AMQImpl.Connection.Tune.Builder()
- .channelMax(maxChannelNo)
- .frameMax(maxFrameSize)
- .heartbeat(heartBeatDelay)
- .build();
- this.state = ConnectionState.AWAIT_TUNE_OK;
- try {
- ctx.writeAndFlush(AMQPFrame.get(tune.toFrame(0)));
- } catch (IOException e) {
- logger.error("Exception occur while handling startOk, e:", e);
- AmqpUtils.sendConnectionClose(ErrorCodes.INTERNAL_ERROR, "exception happen when generate frame", 0);
- throw new RuntimeException(e);
- }
+ log.debug("SASL Mechanism selected: {} Locale : {}, response: {}", mechanism, locale, response);
+ writeMethod(this.commandFactory.createConnectionTuneBody(maxChannels, maxFrameSize, heartBeat), 0);
+ state = ConnectionState.AWAIT_TUNE_OK;
}
@Override
public void receiveConnectionSecureOk(LongString response) {
+ if (log.isDebugEnabled()) {
+ log.debug("RECV ConnectionSecureOk");
+ }
+ assertState(ConnectionState.AWAIT_SECURE_OK);
+ writeMethod(this.commandFactory.createConnectionTuneBody(maxChannels,
+ maxFrameSize,
+ heartBeat), 0);
+ state = ConnectionState.AWAIT_TUNE_OK;
}
@Override
public void receiveConnectionTuneOk(int channelMax, long frameMax, int heartbeat) {
- if (logger.isDebugEnabled()) {
- logger.debug("RECV ConnectionTuneOk[ channelMax: {} frameMax: {} heartbeat: {} ]",
+ if (log.isDebugEnabled()) {
+ log.debug("RECV ConnectionTuneOk[ channelMax: {} frameMax: {} heartbeat: {} ]",
channelMax, frameMax, heartbeat);
}
assertState(ConnectionState.AWAIT_TUNE_OK);
+
if (heartbeat > 0) {
- this.heartBeatDelay = heartbeat;
+ this.heartBeat = heartbeat;
long writerDelay = 1000L * heartbeat;
long readerDelay = 1000L * 2 * heartbeat;
initHeartBeatHandler(writerDelay, readerDelay);
}
-
- int serverFrameMax = getDefaultServerMaxFrameSize();
- if (serverFrameMax <= 0) {
- serverFrameMax = Integer.MAX_VALUE;
+ int brokerFrameMax = maxFrameSize;
+ if (brokerFrameMax <= 0) {
+ brokerFrameMax = Integer.MAX_VALUE;
}
- if (frameMax > (long) serverFrameMax) {
- AmqpUtils.sendConnectionClose(ErrorCodes.SYNTAX_ERROR,
+ if (frameMax > (long) brokerFrameMax) {
+ sendConnectionClose(ErrorCodes.SYNTAX_ERROR,
"Attempt to set max frame size to " + frameMax
+ " greater than the broker will allow: "
- + serverFrameMax, 0);
- } else if (frameMax > 0 && frameMax < amqpServer.getEventMeshAmqpConfiguration().minFrameSize) {
- AmqpUtils.sendConnectionClose(ErrorCodes.SYNTAX_ERROR,
+ + brokerFrameMax, 0);
+ } else if (frameMax > 0 && frameMax < amqpConfig.minFrameSize) {
+ sendConnectionClose(ErrorCodes.SYNTAX_ERROR,
"Attempt to set max frame size to " + frameMax
+ " which is smaller than the specification defined minimum: "
- + amqpServer.getEventMeshAmqpConfiguration().minFrameSize, 0);
+ + amqpConfig.maxFrameSize, 0);
} else {
- int calculatedFrameMax = frameMax == 0 ? serverFrameMax : (int) frameMax;
+ int calculatedFrameMax = frameMax == 0 ? brokerFrameMax : (int) frameMax;
setMaxFrameSize(calculatedFrameMax);
+
+ //0 means no implied limit, except that forced by protocol limitations (0xFFFF)
+ int value = ((channelMax == 0) || (channelMax > 0xFFFF))
+ ? 0xFFFF
+ : channelMax;
+ maxChannels = value;
}
+ state = ConnectionState.AWAIT_OPEN;
- this.maxChannelNo = ((channelMax == 0) || (channelMax > 0xFFFF))
- ? 0xFFFF
- : channelMax;
- this.state = ConnectionState.AWAIT_OPEN;
}
@Override
public void receiveConnectionOpen(String virtualHost, String capabilities, boolean insist) {
- if (logger.isDebugEnabled()) {
- logger.debug("RECV ConnectionOpen[virtualHost: {} capabilities: {} insist: {} ]",
+ if (log.isDebugEnabled()) {
+ log.debug("RECV ConnectionOpen[virtualHost: {} capabilities: {} insist: {} ]",
virtualHost, capabilities, insist);
}
assertState(ConnectionState.AWAIT_OPEN);
- boolean isDefaultNamespace = false;
- if ((virtualHost != null) && virtualHost.charAt(0) == '/') {
- virtualHost = virtualHost.substring(1);
- if (StringUtils.isEmpty(virtualHost)) {
- isDefaultNamespace = true;
- this.vhost = VirtualHost.DEFAULT_VHOST;
- }
- } else {
- // currently only support default vhost: "/"
- AmqpUtils.sendConnectionClose(ErrorCodes.NOT_FOUND, "Unknown virtual host: '" + virtualHost + "'", 0);
+
+ if (!DEFAULT_VIRTUALHOST.equals(virtualHost)) {
+ sendConnectionClose(ErrorCodes.NOT_FOUND, "vhost: " + virtualHost
+ + " not found ", 0);
return;
}
- AMQImpl.Connection.OpenOk openOk = (AMQImpl.Connection.OpenOk) new AMQP.Connection.OpenOk.Builder()
- .knownHosts(virtualHost)
- .build();
+
+
+ this.virtualHostName = virtualHost;
+ writeMethod(this.commandFactory.createConnectionOpenOkBody(virtualHost), 0);
+ state = ConnectionState.OPEN;
+ }
+
+ @Override
+ public void receiveConnectionClose(int replyCode, String replyText,
+ int classId, int methodId) {
+ if (log.isDebugEnabled()) {
+ log.debug("RECV ConnectionClose[ replyCode: {} replyText: {} classId: {} methodId: {} ]",
+ replyCode, replyText, classId, methodId);
+ }
+
try {
- ctx.writeAndFlush(AMQPFrame.get(openOk.toFrame(0)));
- } catch (IOException e) {
- logger.error("Exception occur while handling open, e:", e);
- AmqpUtils.sendConnectionClose(ErrorCodes.INTERNAL_ERROR, "exception happen when generate frame", 0);
- throw new RuntimeException(e);
+ if (orderlyClose.compareAndSet(false, true)) {
+ completeAndCloseAllChannels();
+ }
+
+ writeMethod(this.commandFactory.createConnectionCloseOkBody(), 0);
+ } catch (Exception e) {
+ log.error("Error closing connection for " + this.remoteAddress.toString(), e);
+ } finally {
+ close();
}
- this.state = ConnectionState.OPEN;
}
@Override
- public void receiveChannelOpen(int channelId) {
- if (logger.isDebugEnabled()) {
- logger.debug("RECV[" + channelId + "] ChannelOpen");
+ public void receiveConnectionCloseOk() {
+ if (log.isDebugEnabled()) {
+ log.debug("RECV ConnectionCloseOk");
}
- assertState(ConnectionState.OPEN);
+ close();
+ }
- if (vhost == null) {
- AmqpUtils.sendConnectionClose(ErrorCodes.COMMAND_INVALID,
- "Virtualhost has not yet been set. ConnectionOpen has not been called.", channelId);
- } else if (channels.containsKey(channelId) || channelAwaitingClosure(channelId)) {
- AmqpUtils.sendConnectionClose(ErrorCodes.CHANNEL_ERROR, "Channel " + channelId + " already exists", channelId);
- } else if (channelId > maxChannelNo) {
- AmqpUtils.sendConnectionClose(ErrorCodes.CHANNEL_ERROR,
- "Channel " + channelId + " cannot be created as the max allowed channel id is "
- + maxChannelNo,
- channelId);
- } else {
- AmqpChannel amqpChannel = new AmqpChannel(channelId, this);
- this.channels.put(channelId, amqpChannel);
+ public void sendConnectionClose(int errorCode, String message, int channelId) {
+ sendConnectionClose(channelId, new AMQImpl.Connection.Close(errorCode,
+ message, currentClassId, currentMethodId));
+ }
- AMQImpl.Channel.OpenOk openOk = (AMQImpl.Channel.OpenOk) new AMQP.Channel.OpenOk.Builder()
- .channelId(String.valueOf(channelId))
- .build();
+ private void sendConnectionClose(int channelId, com.rabbitmq.client.Method method) {
+ if (orderlyClose.compareAndSet(false, true)) {
try {
- ctx.writeAndFlush(AMQPFrame.get(openOk.toFrame(channelId)));
- } catch (IOException e) {
- logger.error("Exception occur while handling open, e:", e);
- AmqpUtils.sendConnectionClose(ErrorCodes.INTERNAL_ERROR, "exception happen when generate frame", 0);
- throw new RuntimeException(e);
+ markChannelAwaitingCloseOk(channelId);
+ completeAndCloseAllChannels();
+ } finally {
+ writeMethod(method, 0);
}
}
}
@Override
- public ProtocolVersion getProtocolVersion() {
- return null;
- }
+ public void receiveChannelOpen(int channelId) {
- @Override
- public ChannelMethodProcessor getChannelMethodProcessor(int channelId) {
- return null;
- }
+ if (log.isDebugEnabled()) {
+ log.debug("RECV[" + channelId + "] ChannelOpen");
+ }
+ assertState(ConnectionState.OPEN);
- @Override
- public void receiveConnectionClose(int replyCode, String replyText, int classId, int methodId) {
+ if (this.virtualHostName == null) {
+ sendConnectionClose(ErrorCodes.COMMAND_INVALID,
+ "Virtualhost has not yet been set. ConnectionOpen has not been called.", channelId);
+ } else if (channels.get(channelId) != null || channelAwaitingClosure(channelId)) {
+ sendConnectionClose(ErrorCodes.CHANNEL_ERROR, "Channel " + channelId + " already exists", channelId);
+ } else if (channelId > maxChannels) {
+ sendConnectionClose(ErrorCodes.CHANNEL_ERROR,
+ "Channel " + channelId + " cannot be created as the max allowed channel id is "
+ + maxChannels,
+ channelId);
+ } else {
+ log.debug("Connecting to: {}", virtualHostName.toString());
+ final AmqpChannel channel = new AmqpChannel(channelId, this);
+ addChannel(channel);
+ writeMethod(this.commandFactory.createChannelOpenOkBody(channelId), channelId);
+ }
}
- @Override
- public void receiveConnectionCloseOk() {
-
+ private void addChannel(AmqpChannel channel) {
+ synchronized (channelAddRemoveLock) {
+ channels.put(channel.getChannelId(), channel);
+ if (blocked.get()) {
+ channel.block();
+ }
+ }
}
@Override
public void receiveHeartbeat() {
-
+ if (log.isDebugEnabled()) {
+ log.debug("RECV Heartbeat");
+ }
+ // noop
}
- /**
- * process protocol header, validate protocol version etc
- * @param protocolFrame protocol header
- */
@Override
- public void receiveProtocolHeader(ProtocolFrame protocolFrame) {
- if (logger.isDebugEnabled()) {
- logger.debug("RECV Protocol Header [{}]", protocolFrame);
+ public void receiveProtocolHeader(ProtocolFrame pf) {
+ if (log.isDebugEnabled()) {
+ log.debug("RECV Protocol Header [{}]", pf);
}
+
try {
- ProtocolVersion pv = protocolFrame.checkVersion();
- setProtocolVersion(pv);
- // TODO: 2022/9/21 build mechanism
- String mechanisms = "PLAIN token";
- String locales = "en_US";
- // TODO: 2022/9/19 put server Properties into map
- this.state = ConnectionState.AWAIT_START_OK;
- AMQImpl.Connection.Start start = (AMQImpl.Connection.Start) new AMQP.Connection.Start.Builder()
- .versionMajor(pv.getProtocolMajor())
- .versionMinor(pv.getActualMinorVersion())
- .serverProperties(null)
- .mechanisms(mechanisms)
- .locales(locales)
- .build();
- ctx.writeAndFlush(AMQPFrame.get(start.toFrame(0)));
+ ProtocolVersion pv = pf.checkVersion(); // Fails if not correct
+ // TODO serverProperties mechanis
+ this.protocolVersion = pv;
+ writeMethod(this.commandFactory.createConnectionStartBody(
+ (short) pv.getProtocolMajor(),
+ (short) pv.getProtocolMinor(),
+ null,
+ // TODO temporary modification
+ "PLAIN AMQPLAIN",
+ "en_US"), 0);
+ state = ConnectionState.AWAIT_START_OK;
} catch (Exception e) {
- logger.error("Exception occur while handling protocol header, e:", e);
- AmqpUtils.writeFrame(new ProtocolFrame(ProtocolVersion.v0_91));
+ log.error("Received unsupported protocol initiation for protocol version: {} ", getProtocolVersion(), e);
+ writeFrame(new ProtocolFrame(ProtocolVersion.v0_91));
+ close();
throw new RuntimeException(e);
}
-
}
@Override
- public void setCurrentMethod(int classId, int methodId) {
+ public ProtocolVersion getProtocolVersion() {
+ return protocolVersion;
+ }
+ @Override
+ public ChannelMethodProcessor getChannelMethodProcessor(int channelId) {
+ assertState(ConnectionState.OPEN);
+ ChannelMethodProcessor channelMethodProcessor = getChannel(channelId);
+ if (channelMethodProcessor == null) {
+ channelMethodProcessor =
+ (ChannelMethodProcessor) Proxy.newProxyInstance(ChannelMethodProcessor.class.getClassLoader(),
+ new Class[]{ChannelMethodProcessor.class}, new InvocationHandler() {
+ @Override
+ public Object invoke(final Object proxy, final Method method, final Object[] args)
+ throws Throwable {
+ if (method.getName().equals("receiveChannelCloseOk") && channelAwaitingClosure(channelId)) {
+ closeChannelOk(channelId);
+ } else if (method.getName().startsWith("receive")) {
+ sendConnectionClose(ErrorCodes.CHANNEL_ERROR,
+ "Unknown channel id: " + channelId, channelId);
+ } else if (method.getName().equals("ignoreAllButCloseOk")) {
+ return channelAwaitingClosure(channelId);
+ }
+ return null;
+ }
+ });
+ }
+ return channelMethodProcessor;
}
@Override
- public boolean ignoreAllButCloseOk() {
- return false;
+ public void setCurrentMethod(int classId, int methodId) {
+ currentClassId = classId;
+ currentMethodId = methodId;
}
- private void setProtocolVersion(ProtocolVersion pv) {
- this.protocolVersion = pv;
+ void assertState(final ConnectionState requiredState) {
+ if (state != requiredState) {
+ String replyText = "Command Invalid, expected " + requiredState + " but was " + state;
+ sendConnectionClose(ErrorCodes.COMMAND_INVALID, replyText, 0);
+ throw new RuntimeException(replyText);
+ }
}
- private int getDefaultServerMaxFrameSize() {
- return this.amqpServer.getEventMeshAmqpConfiguration().defaultNetworkBufferSize - AMQPFrame.NON_BODY_SIZE;
+ public boolean channelAwaitingClosure(int channelId) {
+ return ignoreAllButCloseOk() || (!closingChannelsList.isEmpty()
+ && closingChannelsList.containsKey(channelId));
}
- private void setMaxFrameSize(int frameSize) {
- this.maxFrameSize = frameSize;
- Attribute<Integer> maxFrameSizeAttr = this.ctx.channel().attr(ProtocolKey.MAX_FRAME_SIZE);
- maxFrameSizeAttr.setIfAbsent(frameSize);
+ public void completeAndCloseAllChannels() {
+ try {
+ receivedCompleteAllChannels();
+ } finally {
+ closeAllChannels();
+ }
}
- public void initHeartBeatHandler(long writerIdle, long readerIdle) {
+ private void receivedCompleteAllChannels() {
+ RuntimeException exception = null;
- this.ctx.pipeline().addFirst("idleStateHandler", new IdleStateHandler(readerIdle, writerIdle, 0,
- TimeUnit.MILLISECONDS));
- this.ctx.pipeline().addLast("connectionIdleHandler", new ConnectionIdleHandler());
+ for (AmqpChannel channel : channels.values()) {
+ try {
+ channel.receivedComplete();
+ } catch (RuntimeException exceptionForThisChannel) {
+ if (exception == null) {
+ exception = exceptionForThisChannel;
+ }
+ log.error("error informing channel that receiving is complete. Channel:{} ", channel,
+ exceptionForThisChannel);
+ }
+ }
+ if (exception != null) {
+ throw exception;
+ }
}
- void assertState(final ConnectionState requiredState) {
- if (state != requiredState) {
- String replyText = "Command Invalid, expected " + requiredState + " but was " + state;
- AmqpUtils.sendConnectionClose(ErrorCodes.COMMAND_INVALID, replyText, 0);
- throw new RuntimeException(replyText);
+ public void writeFrame(AMQData frame) {
+ if (log.isDebugEnabled()) {
+ log.debug("send: {}", frame);
}
+ getCtx().writeAndFlush(frame);
}
- public boolean channelAwaitingClosure(int channelId) {
- return ignoreAllButCloseOk() || (!closingChannelsList.isEmpty()
- && closingChannelsList.containsKey(channelId));
+ public void writeMethod(com.rabbitmq.client.Method method, int channelNo) {
+ if (log.isDebugEnabled()) {
+ log.debug("send method:{}", method);
+ }
+ try {
+ Frame frame = ((com.rabbitmq.client.impl.Method) method).toFrame(channelNo);
+ writeFrame(AMQPFrame.get(frame));
+ } catch (IOException e) {
+ log.error("write method frame exce.", e);
+ close();
+ }
}
- public MetaMessageService getAmqpBrokerService() {
- return metaMessageService;
+ public void initHeartBeatHandler(long writerIdle, long readerIdle) {
+
+ this.ctx.pipeline().addFirst("idleStateHandler", new IdleStateHandler(readerIdle, writerIdle, 0,
+ TimeUnit.MILLISECONDS));
+ this.ctx.pipeline().addLast("connectionIdleHandler", new ConnectionIdleHandler());
+
}
class ConnectionIdleHandler extends ChannelDuplexHandler {
- @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+ @Override
+ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state().equals(IdleState.READER_IDLE)) {
- logger.error("heartbeat timeout close remoteSocketAddress [{}]",
+ log.error("heartbeat timeout close remoteSocketAddress [{}]",
AmqpConnection.this.remoteAddress.toString());
AmqpConnection.this.close();
} else if (event.state().equals(IdleState.WRITER_IDLE)) {
- logger.warn("heartbeat write idle [{}]", AmqpConnection.this.remoteAddress.toString());
- AmqpUtils.writeFrame(AMQPFrame.get(new Frame(AMQP.FRAME_HEARTBEAT, 0)));
+ log.debug("heartbeat write idle [{}]", AmqpConnection.this.remoteAddress.toString());
+ writeFrame(new AMQPFrame(AMQP.FRAME_HEARTBEAT, 0));
}
}
+
super.userEventTriggered(ctx, evt);
}
+
+ }
+
+ public void setMaxFrameSize(int frameMax) {
+ maxFrameSize = frameMax;
+ if (ctx.channel().attr(AmqpCodeDecoder.ATTRIBUTE_KEY_MAX_FRAME_SIZE) != null) {
+ ctx.channel().attr(AmqpCodeDecoder.ATTRIBUTE_KEY_MAX_FRAME_SIZE).set(frameMax);
+ }
+
+ }
+
+ public AmqpChannel getChannel(int channelId) {
+ final AmqpChannel channel = channels.get(channelId);
+ if ((channel == null) || channel.isClosing()) {
+ return null;
+ } else {
+ return channel;
+ }
+ }
+
+ public boolean isClosing() {
+ return orderlyClose.get();
+ }
+
+ @Override
+ public boolean ignoreAllButCloseOk() {
+ return isClosing();
+ }
+
+ public void closeChannelOk(int channelId) {
+ closingChannelsList.remove(channelId);
+ }
+
+ private void markChannelAwaitingCloseOk(int channelId) {
+ closingChannelsList.put(channelId, System.currentTimeMillis());
+ }
+
+ private void removeChannel(int channelId) {
+ synchronized (channelAddRemoveLock) {
+ channels.remove(channelId);
+ }
+ }
+
+ public void closeChannel(AmqpChannel channel) {
+ closeChannel(channel, false);
+ }
+
+ public void closeChannelAndWriteFrame(AmqpChannel channel, int cause, String message) {
+ writeMethod(this.commandFactory.createChannelCloseBody(cause, message,
+ currentClassId, currentMethodId), channel.getChannelId());
+ closeChannel(channel, true);
+ }
+
+ void closeChannel(AmqpChannel channel, boolean mark) {
+ int channelId = channel.getChannelId();
+ try {
+ channel.close();
+ if (mark) {
+ markChannelAwaitingCloseOk(channelId);
+ }
+ } finally {
+ removeChannel(channelId);
+ }
+ }
+
+ private void closeAllChannels() {
+ RuntimeException exception = null;
+ try {
+ for (AmqpChannel channel : channels.values()) {
+ try {
+ channel.close();
+ } catch (RuntimeException exceptionForThisChannel) {
+ if (exception == null) {
+ exception = exceptionForThisChannel;
+ }
+ log.error("error informing channel that receiving is complete. Channel:{}", channel,
+ exceptionForThisChannel);
+ }
+ }
+ if (exception != null) {
+ throw exception;
+ }
+ } finally {
+ synchronized (channelAddRemoveLock) {
+ channels.clear();
+ }
+ }
+ }
+
+ public void block() {
+ synchronized (channelAddRemoveLock) {
+ if (blocked.compareAndSet(false, true)) {
+ for (AmqpChannel channel : channels.values()) {
+ channel.block();
+ }
+ }
+ }
+ }
+
+
+ public int getMaxChannels() {
+ return maxChannels;
+ }
+
+ public int getMaxFrameSize() {
+ return maxFrameSize;
+ }
+
+ public int getHeartBeat() {
+ return heartBeat;
+ }
+
+ public String getVirtualHostName() {
+ return virtualHostName;
+ }
+
+
+// public AmqpMessageSender getAmqpOutputConverter() {
+// return amqpOutputConverter;
+// }
+
+ public String getConnectionId() {
+ return connectionId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ AmqpConnection that = (AmqpConnection) o;
+ return connectionId.equals(that.connectionId) && Objects.equals(virtualHostName, that.virtualHostName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(connectionId, virtualHostName);
+ }
+
+
+ public ConnectionState getState() {
+ return state;
+ }
+
+ public long getChannelNum() {
+ return channels.size();
+ }
+
+ public CommandFactory getCommandFactory() {
+ return commandFactory;
+ }
+
+ public boolean isWritable() {
+ return ctx.channel().isWritable();
+ }
+
+ public boolean isActive() {
+ return ctx.channel().isActive();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org