You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/03/18 02:42:47 UTC
[58/65] [abbrv] activemq-artemis git commit: Refactoring between
Connection and protocol manager
Refactoring between Connection and protocol manager
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/b0896b35
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/b0896b35
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/b0896b35
Branch: refs/heads/refactor-openwire
Commit: b0896b353bf5ea60fba9f81b296791fd27535cf7
Parents: 4f39e04
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed Feb 24 22:30:28 2016 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Mar 17 14:10:46 2016 -0400
----------------------------------------------------------------------
.../protocol/openwire/OpenWireConnection.java | 331 ++++++++++++++-----
.../openwire/OpenWireProtocolManager.java | 325 +++---------------
.../core/protocol/openwire/amq/AMQConsumer.java | 20 +-
.../openwire/amq/AMQProducerBrokerExchange.java | 96 ------
.../openwire/amq/AMQServerConsumer.java | 12 +
.../core/protocol/openwire/amq/AMQSession.java | 21 +-
.../artemis/core/server/ServerConsumer.java | 6 +
.../server/SlowConsumerDetectionListener.java | 22 ++
.../artemis/core/server/impl/QueueImpl.java | 2 +
.../core/server/impl/ServerConsumerImpl.java | 65 +---
10 files changed, 369 insertions(+), 531 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b0896b35/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 991f24b..6f2e3be 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -23,37 +23,45 @@ import javax.jms.ResourceAllocationException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.LinkedList;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.postoffice.Binding;
+import org.apache.activemq.artemis.core.postoffice.Bindings;
+import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQCompositeConsumerBrokerExchange;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumerBrokerExchange;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerExchange;
+import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQServerConsumer;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSingleConsumerBrokerExchange;
-import org.apache.activemq.artemis.core.remoting.CloseListener;
import org.apache.activemq.artemis.core.remoting.FailureListener;
+import org.apache.activemq.artemis.core.security.CheckType;
import org.apache.activemq.artemis.core.security.SecurityAuth;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.ServerConsumer;
+import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
-import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionControl;
@@ -102,33 +110,32 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
private final OpenWireProtocolManager protocolManager;
- private final List<CloseListener> closeListeners = new CopyOnWriteArrayList<>();
-
private boolean destroyed = false;
private final Object sendLock = new Object();
- private final Acceptor acceptorUsed;
-
private final OpenWireFormat wireFormat;
private AMQConnectionContext context;
- private Throwable stopError = null;
-
private final AtomicBoolean stopping = new AtomicBoolean(false);
- private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock();
-
- protected final List<Command> dispatchQueue = new LinkedList<>();
-
private boolean inServiceException;
private final AtomicBoolean asyncException = new AtomicBoolean(false);
+ // Clebert: Artemis session has meta-data support, perhaps we could reuse it here
+ private Map<String, SessionId> sessionIdMap = new ConcurrentHashMap<>();
+
+
private final Map<ConsumerId, AMQConsumerBrokerExchange> consumerExchanges = new HashMap<>();
private final Map<ProducerId, AMQProducerBrokerExchange> producerExchanges = new HashMap<>();
+ // Clebert TODO: Artemis already stores the Session. Why do we need a different one here
+ private Map<SessionId, AMQSession> sessions = new ConcurrentHashMap<>();
+
+
+
private ConnectionState state;
private final Set<ActiveMQDestination> tempQueues = new ConcurrentHashSet<>();
@@ -139,14 +146,12 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
private String defaultSocketURIString;
- public OpenWireConnection(Acceptor acceptorUsed,
- Connection connection,
+ public OpenWireConnection(Connection connection,
Executor executor,
OpenWireProtocolManager openWireProtocolManager,
OpenWireFormat wf) {
super(connection, executor);
this.protocolManager = openWireProtocolManager;
- this.acceptorUsed = acceptorUsed;
this.wireFormat = wf;
this.defaultSocketURIString = connection.getLocalAddress();
}
@@ -322,8 +327,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
}
- // throw a WireFormatInfo to the peer
- public void init() {
+ // send a WireFormatInfo to the peer
+ public void sendHandshake() {
WireFormatInfo info = wireFormat.getPreferedWireFormatInfo();
sendCommand(info);
}
@@ -590,7 +595,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
ActiveMQServerLogger.LOGGER.connectionFailureDetected(me.getMessage(), me.getType());
}
try {
- protocolManager.removeConnection(this, this.getConnectionInfo(), me);
+ protocolManager.removeConnection(this.getConnectionInfo(), me);
}
catch (InvalidClientIDException e) {
ActiveMQServerLogger.LOGGER.warn("Couldn't close connection because invalid clientID", e);
@@ -681,40 +686,185 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
context.incRefCount();
}
- /** This will answer with commands to the client */
+ /**
+ * This will answer with commands to the client
+ */
public boolean sendCommand(final Command command) {
if (ActiveMQServerLogger.LOGGER.isTraceEnabled()) {
ActiveMQServerLogger.LOGGER.trace("sending " + command);
}
- synchronized (this) {
- if (isDestroyed()) {
- return false;
+
+ if (isDestroyed()) {
+ return false;
+ }
+
+ try {
+ physicalSend(command);
+ }
+ catch (Exception e) {
+ return false;
+ }
+ catch (Throwable t) {
+ return false;
+ }
+ return true;
+ }
+
+ public void addDestination(DestinationInfo info) throws Exception {
+ ActiveMQDestination dest = info.getDestination();
+ if (dest.isQueue()) {
+ SimpleString qName = OpenWireUtil.toCoreAddress(dest);
+ QueueBinding binding = (QueueBinding) protocolManager.getServer().getPostOffice().getBinding(qName);
+ if (binding == null) {
+ if (getState().getInfo() != null) {
+
+ CheckType checkType = dest.isTemporary() ? CheckType.CREATE_NON_DURABLE_QUEUE : CheckType.CREATE_DURABLE_QUEUE;
+ protocolManager.getServer().getSecurityStore().check(qName, checkType, this);
+
+ protocolManager.getServer().checkQueueCreationLimit(getUsername());
+ }
+ ConnectionInfo connInfo = getState().getInfo();
+ protocolManager.getServer().createQueue(qName, qName, null, connInfo == null ? null : SimpleString.toSimpleString(connInfo.getUserName()), false, dest.isTemporary());
}
- try {
- physicalSend(command);
+ if (dest.isTemporary()) {
+ registerTempQueue(dest);
}
- catch (Exception e) {
- return false;
+ }
+
+ if (!AdvisorySupport.isAdvisoryTopic(dest)) {
+ AMQConnectionContext context = getContext();
+ DestinationInfo advInfo = new DestinationInfo(context.getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, dest);
+
+ ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(dest);
+ protocolManager.fireAdvisory(context, topic, advInfo);
+ }
+ }
+
+
+ public void updateConsumer(ConsumerControl consumerControl) {
+ SessionId sessionId = consumerControl.getConsumerId().getParentId();
+ AMQSession amqSession = sessions.get(sessionId);
+ amqSession.updateConsumerPrefetchSize(consumerControl.getConsumerId(), consumerControl.getPrefetch());
+ }
+
+ public void addConsumer(ConsumerInfo info) throws Exception {
+ // Todo: add a destination interceptors holder here (amq supports this)
+ SessionId sessionId = info.getConsumerId().getParentId();
+ ConnectionId connectionId = sessionId.getParentId();
+ ConnectionState cs = getState();
+ if (cs == null) {
+ throw new IllegalStateException("Cannot add a consumer to a connection that had not been registered: " + connectionId);
+ }
+ SessionState ss = cs.getSessionState(sessionId);
+ if (ss == null) {
+ throw new IllegalStateException(protocolManager.getServer() + " Cannot add a consumer to a session that had not been registered: " + sessionId);
+ }
+ // Avoid replaying dup commands
+ if (!ss.getConsumerIds().contains(info.getConsumerId())) {
+
+ AMQSession amqSession = sessions.get(sessionId);
+ if (amqSession == null) {
+ throw new IllegalStateException("Session not exist! : " + sessionId);
+ }
+
+ amqSession.createConsumer(info, amqSession, new SlowConsumerDetection());
+
+ ss.addConsumer(info);
+ }
+ }
+
+ class SlowConsumerDetection implements SlowConsumerDetectionListener {
+
+ @Override
+ public void onSlowConsumer(ServerConsumer consumer) {
+ if (consumer instanceof AMQServerConsumer) {
+ AMQServerConsumer serverConsumer = (AMQServerConsumer)consumer;
+ ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(serverConsumer.getAmqConsumer().getDestination());
+ ActiveMQMessage advisoryMessage = new ActiveMQMessage();
+ try {
+ advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, serverConsumer.getAmqConsumer().getId().toString());
+ protocolManager.fireAdvisory(context, topic, advisoryMessage, serverConsumer.getAmqConsumer().getId());
+ }
+ catch (Exception e) {
+ // TODO-NOW: LOGGING
+ e.printStackTrace();
+ }
}
- catch (Throwable t) {
- return false;
+ }
+ }
+
+ public void addSessions(Set<SessionId> sessionSet) {
+ Iterator<SessionId> iter = sessionSet.iterator();
+ while (iter.hasNext()) {
+ SessionId sid = iter.next();
+ addSession(getState().getSessionState(sid).getInfo(), true);
+ }
+ }
+
+ public AMQSession addSession(SessionInfo ss) {
+ return addSession(ss, false);
+ }
+
+ public AMQSession addSession(SessionInfo ss, boolean internal) {
+ AMQSession amqSession = new AMQSession(getState().getInfo(), ss, protocolManager.getServer(), this, protocolManager.getScheduledPool(), protocolManager);
+ amqSession.initialize();
+ amqSession.setInternal(internal);
+ sessions.put(ss.getSessionId(), amqSession);
+ sessionIdMap.put(amqSession.getCoreSession().getName(), ss.getSessionId());
+ return amqSession;
+ }
+
+ public void removeSession(AMQConnectionContext context, SessionInfo info) throws Exception {
+ AMQSession session = sessions.remove(info.getSessionId());
+ if (session != null) {
+ session.close();
+ }
+ }
+
+ public AMQSession getSession(SessionId sessionId) {
+ return sessions.get(sessionId);
+ }
+
+ public void removeDestination(ActiveMQDestination dest) throws Exception {
+ if (dest.isQueue()) {
+ SimpleString qName = new SimpleString("jms.queue." + dest.getPhysicalName());
+ protocolManager.getServer().destroyQueue(qName);
+ }
+ else {
+ Bindings bindings = protocolManager.getServer().getPostOffice().getBindingsForAddress(SimpleString.toSimpleString("jms.topic." + dest.getPhysicalName()));
+ Iterator<Binding> iterator = bindings.getBindings().iterator();
+
+ while (iterator.hasNext()) {
+ Queue b = (Queue) iterator.next().getBindable();
+ if (b.getConsumerCount() > 0) {
+ throw new Exception("Destination still has an active subscription: " + dest.getPhysicalName());
+ }
+ if (b.isDurable()) {
+ throw new Exception("Destination still has durable subscription: " + dest.getPhysicalName());
+ }
+ b.deleteQueue();
}
- return true;
+ }
+
+ if (!AdvisorySupport.isAdvisoryTopic(dest)) {
+ AMQConnectionContext context = getContext();
+ DestinationInfo advInfo = new DestinationInfo(context.getConnectionId(), DestinationInfo.REMOVE_OPERATION_TYPE, dest);
+
+ ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(dest);
+ protocolManager.fireAdvisory(context, topic, advInfo);
}
}
// This will listen for commands throught the protocolmanager
public class CommandProcessor implements CommandVisitor {
-
public AMQConnectionContext getContext() {
return OpenWireConnection.this.getContext();
}
@Override
public Response processAddConnection(ConnectionInfo info) throws Exception {
- //let protoclmanager handle connection add/remove
try {
protocolManager.addConnection(OpenWireConnection.this, info);
}
@@ -739,7 +889,36 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
public Response processAddProducer(ProducerInfo info) throws Exception {
Response resp = null;
try {
- protocolManager.addProducer(OpenWireConnection.this, info);
+ SessionId sessionId = info.getProducerId().getParentId();
+ ConnectionId connectionId = sessionId.getParentId();
+ ConnectionState cs = getState();
+ if (cs == null) {
+ throw new IllegalStateException("Cannot add a producer to a connection that had not been registered: " + connectionId);
+ }
+ SessionState ss = cs.getSessionState(sessionId);
+ if (ss == null) {
+ throw new IllegalStateException("Cannot add a producer to a session that had not been registered: " + sessionId);
+ }
+ // Avoid replaying dup commands
+ if (!ss.getProducerIds().contains(info.getProducerId())) {
+
+ AMQSession amqSession = sessions.get(sessionId);
+ if (amqSession == null) {
+ throw new IllegalStateException("Session not exist! : " + sessionId);
+ }
+
+ ActiveMQDestination destination = info.getDestination();
+ if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) {
+ if (destination.isQueue()) {
+ OpenWireUtil.validateDestination(destination, amqSession);
+ }
+ DestinationInfo destInfo = new DestinationInfo(getContext().getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, destination);
+ OpenWireConnection.this.addDestination(destInfo);
+ }
+
+ ss.addProducer(info);
+
+ }
}
catch (Exception e) {
if (e instanceof ActiveMQSecurityException) {
@@ -759,7 +938,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
public Response processAddConsumer(ConsumerInfo info) {
Response resp = null;
try {
- protocolManager.addConsumer(OpenWireConnection.this, info);
+ addConsumer(info);
}
catch (Exception e) {
e.printStackTrace();
@@ -776,13 +955,14 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
@Override
public Response processRemoveDestination(DestinationInfo info) throws Exception {
ActiveMQDestination dest = info.getDestination();
- protocolManager.removeDestination(OpenWireConnection.this, dest);
+ removeDestination(dest);
return null;
}
@Override
public Response processRemoveProducer(ProducerId id) throws Exception {
- protocolManager.removeProducer(id);
+
+ // TODO-now: proper implement this method
return null;
}
@@ -807,7 +987,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
}
state.removeSession(id);
- protocolManager.removeSession(context, session.getInfo());
+ removeSession(context, session.getInfo());
return null;
}
@@ -843,7 +1023,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
public Response processAddDestination(DestinationInfo dest) throws Exception {
Response resp = null;
try {
- protocolManager.addDestination(OpenWireConnection.this, dest);
+ addDestination(dest);
}
catch (Exception e) {
if (e instanceof ActiveMQSecurityException) {
@@ -860,14 +1040,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
public Response processAddSession(SessionInfo info) throws Exception {
// Avoid replaying dup commands
if (!state.getSessionIds().contains(info.getSessionId())) {
- protocolManager.addSession(OpenWireConnection.this, info);
- try {
- state.addSession(info);
- }
- catch (IllegalStateException e) {
- e.printStackTrace();
- protocolManager.removeSession(context, info);
- }
+ addSession(info);
+ state.addSession(info);
}
return null;
}
@@ -923,7 +1097,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
//amq5 clients send this command to restore prefetchSize
//after successful reconnect
try {
- protocolManager.updateConsumer(OpenWireConnection.this, consumerControl);
+ updateConsumer(consumerControl);
}
catch (Exception e) {
//log error
@@ -976,33 +1150,31 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
boolean sendProducerAck = !messageSend.isResponseRequired() && producerInfo.getWindowSize() > 0 && !pcontext.isInRecoveryMode();
- AMQSession session = protocolManager.getSession(producerId.getParentId());
+ AMQSession session = getSession(producerId.getParentId());
- // TODO: canDispatch is always returning true;
- if (producerExchange.canDispatch(messageSend)) {
- SendingResult result = session.send(producerExchange, messageSend, sendProducerAck);
- if (result.isBlockNextSend()) {
- if (!context.isNetworkConnection() && result.isSendFailIfNoSpace()) {
- // TODO see logging
- throw new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" + producerId + ") to prevent flooding " + result.getBlockingAddress() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info");
- }
+ SendingResult result = session.send(producerExchange, messageSend, sendProducerAck);
+ if (result.isBlockNextSend()) {
+ if (!context.isNetworkConnection() && result.isSendFailIfNoSpace()) {
+ // TODO see logging
+ throw new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" + producerId + ") to prevent flooding " + result.getBlockingAddress() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info");
+ }
- if (producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired()) {
- //in that case don't send the response
- //this will force the client to wait until
- //the response is got.
- context.setDontSendReponse(true);
- }
- else {
- //hang the connection until the space is available
- session.blockingWaitForSpace(producerExchange, result);
- }
+ if (producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired()) {
+ //in that case don't send the response
+ //this will force the client to wait until
+ //the response is got.
+ context.setDontSendReponse(true);
}
- else if (sendProducerAck) {
- ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
- OpenWireConnection.this.dispatchAsync(ack);
+ else {
+ //hang the connection until the space is available
+ session.blockingWaitForSpace(producerExchange, result);
}
}
+ else if (sendProducerAck) {
+ // TODO-now: send through OperationContext
+ ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
+ OpenWireConnection.this.dispatchAsync(ack);
+ }
}
catch (Throwable e) {
if (e instanceof ActiveMQSecurityException) {
@@ -1056,15 +1228,26 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
@Override
public Response processRecoverTransactions(TransactionInfo info) throws Exception {
Set<SessionId> sIds = state.getSessionIds();
- TransactionId[] recovered = protocolManager.recoverTransactions(sIds);
- return new DataArrayResponse(recovered);
+
+
+ List<TransactionId> recovered = new ArrayList<>();
+ if (sIds != null) {
+ for (SessionId sid : sIds) {
+ AMQSession s = sessions.get(sid);
+ if (s != null) {
+ s.recover(recovered);
+ }
+ }
+ }
+
+ return new DataArrayResponse(recovered.toArray(new TransactionId[0]));
}
@Override
public Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) throws Exception {
//we let protocol manager to handle connection add/remove
try {
- protocolManager.removeConnection(OpenWireConnection.this, state.getInfo(), null);
+ protocolManager.removeConnection(state.getInfo(), null);
}
catch (Throwable e) {
// log
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b0896b35/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index add1455..bdf27f8 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -17,15 +17,12 @@
package org.apache.activemq.artemis.core.protocol.openwire;
import javax.jms.InvalidClientIDException;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -40,26 +37,14 @@ import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener;
import org.apache.activemq.artemis.api.core.client.TopologyMember;
-import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
-import org.apache.activemq.artemis.api.core.management.ManagementHelper;
-import org.apache.activemq.artemis.core.io.IOCallback;
-import org.apache.activemq.artemis.core.postoffice.Binding;
-import org.apache.activemq.artemis.core.postoffice.Bindings;
-import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
-import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerExchange;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
-import org.apache.activemq.artemis.core.security.CheckType;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
import org.apache.activemq.artemis.core.server.management.ManagementService;
-import org.apache.activemq.artemis.core.server.management.Notification;
-import org.apache.activemq.artemis.core.server.management.NotificationListener;
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
@@ -69,7 +54,6 @@ import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
import org.apache.activemq.artemis.utils.DataConstants;
-import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.BrokerId;
@@ -78,31 +62,24 @@ import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionControl;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
-import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerId;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
-import org.apache.activemq.command.SessionId;
-import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.openwire.OpenWireFormatFactory;
-import org.apache.activemq.state.ConnectionState;
import org.apache.activemq.state.ProducerState;
-import org.apache.activemq.state.SessionState;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.InetAddressUtil;
import org.apache.activemq.util.LongSequenceGenerator;
-public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, NotificationListener, ClusterTopologyListener {
+public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, ClusterTopologyListener {
private static final IdGenerator BROKER_ID_GENERATOR = new IdGenerator();
private static final IdGenerator ID_GENERATOR = new IdGenerator();
@@ -127,21 +104,16 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
private final CopyOnWriteArrayList<OpenWireConnection> connections = new CopyOnWriteArrayList<>();
protected final ConcurrentMap<ConnectionId, ConnectionInfo> connectionInfos = new ConcurrentHashMap<>();
+
// Clebert TODO: use ConcurrentHashMap, or maybe use the schema that's already available on Artemis upstream (unique-client-id)
private final Map<String, AMQConnectionContext> clientIdSet = new HashMap<String, AMQConnectionContext>();
private String brokerName;
- // Clebert TODO: Artemis already stores the Session. Why do we need a different one here
- private Map<SessionId, AMQSession> sessions = new ConcurrentHashMap<>();
-
// Clebert: Artemis already has a Resource Manager. Need to remove this..
// The TransactionID extends XATransactionID, so all we need is to convert the XID here
private Map<TransactionId, AMQSession> transactions = new ConcurrentHashMap<>();
- // Clebert: Artemis session has meta-data support, perhaps we could reuse it here
- private Map<String, SessionId> sessionIdMap = new ConcurrentHashMap<>();
-
private final Map<String, TopologyMember> topologyMap = new ConcurrentHashMap<>();
private final LinkedList<TopologyMember> members = new LinkedList<>();
@@ -163,9 +135,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
advisoryProducerId.setConnectionId(ID_GENERATOR.generateId());
ManagementService service = server.getManagementService();
scheduledPool = server.getScheduledPool();
- if (service != null) {
- service.addNotificationListener(this);
- }
final ClusterManager clusterManager = this.server.getClusterManager();
ClusterConnection cc = clusterManager.getDefaultConnection(null);
@@ -187,6 +156,35 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
}
}
+
+ public void removeConnection(ConnectionInfo info,
+ Throwable error) throws InvalidClientIDException {
+ synchronized (clientIdSet) {
+ String clientId = info.getClientId();
+ if (clientId != null) {
+ AMQConnectionContext context = this.clientIdSet.get(clientId);
+ if (context != null && context.decRefCount() == 0) {
+ //connection is still there and need to close
+ context.getConnection().disconnect(error != null);
+ this.connections.remove(this);//what's that for?
+ this.clientIdSet.remove(clientId);
+ }
+ }
+ else {
+ throw new InvalidClientIDException("No clientID specified for connection disconnect request");
+ }
+ }
+ }
+
+
+ public ScheduledExecutorService getScheduledPool() {
+ return scheduledPool;
+ }
+
+ public ActiveMQServer getServer() {
+ return server;
+ }
+
private void updateClientClusterInfo() {
synchronized (members) {
@@ -219,8 +217,8 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
@Override
public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection connection) {
OpenWireFormat wf = (OpenWireFormat) wireFactory.createWireFormat();
- OpenWireConnection owConn = new OpenWireConnection(acceptorUsed, connection, server.getExecutorFactory().getExecutor(), this, wf);
- owConn.init();
+ OpenWireConnection owConn = new OpenWireConnection(connection, server.getExecutorFactory().getExecutor(), this, wf);
+ owConn.sendHandshake();
// TODO CLEBERT What is this constant here? we should get it from TTL initial pings
return new ConnectionEntry(owConn, null, System.currentTimeMillis(), 1 * 60 * 1000);
@@ -233,7 +231,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
@Override
public void removeHandler(String name) {
- // TODO Auto-generated method stub
}
@Override
@@ -276,8 +273,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
@Override
public void handshake(NettyServerConnection connection, ActiveMQBuffer buffer) {
- // TODO Auto-generated method stub
-
}
public void addConnection(OpenWireConnection connection, ConnectionInfo info) throws Exception {
@@ -322,11 +317,11 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
fireAdvisory(context, topic, copy);
// init the conn
- addSessions(context.getConnection(), context.getConnectionState().getSessionIds());
+ context.getConnection().addSessions( context.getConnectionState().getSessionIds());
}
}
- private void fireAdvisory(AMQConnectionContext context, ActiveMQTopic topic, Command copy) throws Exception {
+ public void fireAdvisory(AMQConnectionContext context, ActiveMQTopic topic, Command copy) throws Exception {
this.fireAdvisory(context, topic, copy, null);
}
@@ -341,7 +336,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
/*
* See AdvisoryBroker.fireAdvisory()
*/
- private void fireAdvisory(AMQConnectionContext context,
+ public void fireAdvisory(AMQConnectionContext context,
ActiveMQTopic topic,
Command command,
ConsumerId targetConsumerId) throws Exception {
@@ -448,198 +443,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
public boolean isStopping() {
return false;
}
-
- public void addProducer(OpenWireConnection theConn, ProducerInfo info) throws Exception {
- SessionId sessionId = info.getProducerId().getParentId();
- ConnectionId connectionId = sessionId.getParentId();
- ConnectionState cs = theConn.getState();
- if (cs == null) {
- throw new IllegalStateException("Cannot add a producer to a connection that had not been registered: " + connectionId);
- }
- SessionState ss = cs.getSessionState(sessionId);
- if (ss == null) {
- throw new IllegalStateException("Cannot add a producer to a session that had not been registered: " + sessionId);
- }
- // Avoid replaying dup commands
- if (!ss.getProducerIds().contains(info.getProducerId())) {
-
- AMQSession amqSession = sessions.get(sessionId);
- if (amqSession == null) {
- throw new IllegalStateException("Session not exist! : " + sessionId);
- }
-
- ActiveMQDestination destination = info.getDestination();
- if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) {
- if (destination.isQueue()) {
- OpenWireUtil.validateDestination(destination, amqSession);
- }
- DestinationInfo destInfo = new DestinationInfo(theConn.getContext().getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, destination);
- this.addDestination(theConn, destInfo);
- }
-
- amqSession.createProducer(info);
-
- try {
- ss.addProducer(info);
- }
- catch (IllegalStateException e) {
- amqSession.removeProducer(info);
- }
-
- }
-
- }
-
- public void updateConsumer(OpenWireConnection theConn, ConsumerControl consumerControl) {
- SessionId sessionId = consumerControl.getConsumerId().getParentId();
- AMQSession amqSession = sessions.get(sessionId);
- amqSession.updateConsumerPrefetchSize(consumerControl.getConsumerId(), consumerControl.getPrefetch());
- }
-
- public void addConsumer(OpenWireConnection theConn, ConsumerInfo info) throws Exception {
- // Todo: add a destination interceptors holder here (amq supports this)
- SessionId sessionId = info.getConsumerId().getParentId();
- ConnectionId connectionId = sessionId.getParentId();
- ConnectionState cs = theConn.getState();
- if (cs == null) {
- throw new IllegalStateException("Cannot add a consumer to a connection that had not been registered: " + connectionId);
- }
- SessionState ss = cs.getSessionState(sessionId);
- if (ss == null) {
- throw new IllegalStateException(this.server + " Cannot add a consumer to a session that had not been registered: " + sessionId);
- }
- // Avoid replaying dup commands
- if (!ss.getConsumerIds().contains(info.getConsumerId())) {
-
- AMQSession amqSession = sessions.get(sessionId);
- if (amqSession == null) {
- throw new IllegalStateException("Session not exist! : " + sessionId);
- }
-
- amqSession.createConsumer(info, amqSession);
-
- ss.addConsumer(info);
- }
- }
-
- public void addSessions(OpenWireConnection theConn, Set<SessionId> sessionSet) {
- Iterator<SessionId> iter = sessionSet.iterator();
- while (iter.hasNext()) {
- SessionId sid = iter.next();
- addSession(theConn, theConn.getState().getSessionState(sid).getInfo(), true);
- }
- }
-
- public AMQSession addSession(OpenWireConnection theConn, SessionInfo ss) {
- return addSession(theConn, ss, false);
- }
-
- public AMQSession addSession(OpenWireConnection theConn, SessionInfo ss, boolean internal) {
- AMQSession amqSession = new AMQSession(theConn.getState().getInfo(), ss, server, theConn, scheduledPool, this);
- amqSession.initialize();
- amqSession.setInternal(internal);
- sessions.put(ss.getSessionId(), amqSession);
- sessionIdMap.put(amqSession.getCoreSession().getName(), ss.getSessionId());
- return amqSession;
- }
-
- public void removeConnection(OpenWireConnection connection,
- ConnectionInfo info,
- Throwable error) throws InvalidClientIDException {
- synchronized (clientIdSet) {
- String clientId = info.getClientId();
- if (clientId != null) {
- AMQConnectionContext context = this.clientIdSet.get(clientId);
- if (context != null && context.decRefCount() == 0) {
- //connection is still there and need to close
- this.clientIdSet.remove(clientId);
- connection.disconnect(error != null);
- this.connections.remove(connection);//what's that for?
- }
- }
- else {
- throw new InvalidClientIDException("No clientID specified for connection disconnect request");
- }
- }
- }
-
- public void removeSession(AMQConnectionContext context, SessionInfo info) throws Exception {
- AMQSession session = sessions.remove(info.getSessionId());
- if (session != null) {
- session.close();
- }
- }
-
- public void removeProducer(ProducerId id) {
- SessionId sessionId = id.getParentId();
- AMQSession session = sessions.get(sessionId);
- session.removeProducer(id);
- }
-
- public AMQSession getSession(SessionId sessionId) {
- return sessions.get(sessionId);
- }
-
- public void removeDestination(OpenWireConnection connection, ActiveMQDestination dest) throws Exception {
- if (dest.isQueue()) {
- SimpleString qName = new SimpleString("jms.queue." + dest.getPhysicalName());
- this.server.destroyQueue(qName);
- }
- else {
- Bindings bindings = this.server.getPostOffice().getBindingsForAddress(SimpleString.toSimpleString("jms.topic." + dest.getPhysicalName()));
- Iterator<Binding> iterator = bindings.getBindings().iterator();
-
- while (iterator.hasNext()) {
- Queue b = (Queue) iterator.next().getBindable();
- if (b.getConsumerCount() > 0) {
- throw new Exception("Destination still has an active subscription: " + dest.getPhysicalName());
- }
- if (b.isDurable()) {
- throw new Exception("Destination still has durable subscription: " + dest.getPhysicalName());
- }
- b.deleteQueue();
- }
- }
-
- if (!AdvisorySupport.isAdvisoryTopic(dest)) {
- AMQConnectionContext context = connection.getContext();
- DestinationInfo advInfo = new DestinationInfo(context.getConnectionId(), DestinationInfo.REMOVE_OPERATION_TYPE, dest);
-
- ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(dest);
- fireAdvisory(context, topic, advInfo);
- }
- }
-
- public void addDestination(OpenWireConnection connection, DestinationInfo info) throws Exception {
- ActiveMQDestination dest = info.getDestination();
- if (dest.isQueue()) {
- SimpleString qName = OpenWireUtil.toCoreAddress(dest);
- QueueBinding binding = (QueueBinding) server.getPostOffice().getBinding(qName);
- if (binding == null) {
- if (connection.getState().getInfo() != null) {
-
- CheckType checkType = dest.isTemporary() ? CheckType.CREATE_NON_DURABLE_QUEUE : CheckType.CREATE_DURABLE_QUEUE;
- server.getSecurityStore().check(qName, checkType, connection);
-
- server.checkQueueCreationLimit(connection.getUsername());
- }
- ConnectionInfo connInfo = connection.getState().getInfo();
- this.server.createQueue(qName, qName, null, connInfo == null ? null : SimpleString.toSimpleString(connInfo.getUserName()), false, dest.isTemporary());
- }
- if (dest.isTemporary()) {
- connection.registerTempQueue(dest);
- }
- }
-
- if (!AdvisorySupport.isAdvisoryTopic(dest)) {
- AMQConnectionContext context = connection.getContext();
- DestinationInfo advInfo = new DestinationInfo(context.getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, dest);
-
- ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(dest);
- fireAdvisory(context, topic, advInfo);
- }
- }
-
public void endTransaction(TransactionInfo info) throws Exception {
AMQSession txSession = transactions.get(info.getTransactionId());
@@ -682,19 +485,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
transactions.remove(info.getTransactionId());
}
- public TransactionId[] recoverTransactions(Set<SessionId> sIds) {
- List<TransactionId> recovered = new ArrayList<>();
- if (sIds != null) {
- for (SessionId sid : sIds) {
- AMQSession s = this.sessions.get(sid);
- if (s != null) {
- s.recover(recovered);
- }
- }
- }
- return recovered.toArray(new TransactionId[0]);
- }
-
public boolean validateUser(String login, String passcode) {
boolean validated = true;
@@ -717,50 +507,11 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
/**
* TODO: remove this, use the regular ResourceManager from the Server's
- * */
+ */
public void registerTx(TransactionId txId, AMQSession amqSession) {
transactions.put(txId, amqSession);
}
- //advisory support
- @Override
- public void onNotification(Notification notif) {
- try {
- if (notif.getType() instanceof CoreNotificationType) {
- CoreNotificationType type = (CoreNotificationType) notif.getType();
- switch (type) {
- case CONSUMER_SLOW:
- fireSlowConsumer(notif);
- break;
- default:
- break;
- }
- }
- }
- catch (Exception e) {
- ActiveMQServerLogger.LOGGER.error("Failed to send notification " + notif, e);
- }
- }
-
- private void fireSlowConsumer(Notification notif) throws Exception {
- SimpleString coreSessionId = notif.getProperties().getSimpleStringProperty(ManagementHelper.HDR_SESSION_NAME);
- Long coreConsumerId = notif.getProperties().getLongProperty(ManagementHelper.HDR_CONSUMER_NAME);
- SessionId sessionId = sessionIdMap.get(coreSessionId.toString());
- AMQSession session = sessions.get(sessionId);
- AMQConsumer consumer = session.getConsumer(coreConsumerId);
- ActiveMQDestination destination = consumer.getDestination();
-
- if (!AdvisorySupport.isAdvisoryTopic(destination)) {
- ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(destination);
- ConnectionId connId = sessionId.getParentId();
- OpenWireConnection cc = this.brokerConnectionStates.get(connId);
- ActiveMQMessage advisoryMessage = new ActiveMQMessage();
- advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, consumer.getId().toString());
-
- fireAdvisory(cc.getContext(), topic, advisoryMessage, consumer.getId());
- }
- }
-
public void removeSubscription(RemoveSubscriptionInfo subInfo) throws Exception {
SimpleString subQueueName = new SimpleString(org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForDurableSubscription(true, subInfo.getClientId(), subInfo.getSubscriptionName()));
server.destroyQueue(subQueueName);
@@ -795,7 +546,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
return this.updateClusterClients;
}
- public void setUpdateClusterClientsOnRemove(boolean updateClusterClientsOnRemove) {
+ public void setUpdateClusterClientsOnRemove(boolean updateClusterClientsOnRemove) {
this.updateClusterClientsOnRemove = updateClusterClientsOnRemove;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b0896b35/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index b0f007a..221679f 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -27,7 +27,14 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
+import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
+import org.apache.activemq.artemis.core.protocol.openwire.OpenWireUtil;
+import org.apache.activemq.artemis.core.server.QueueQueryResult;
+import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
+import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.MessageAck;
@@ -36,14 +43,9 @@ import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.wireformat.WireFormat;
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
-import org.apache.activemq.artemis.core.protocol.openwire.OpenWireUtil;
-import org.apache.activemq.artemis.core.server.QueueQueryResult;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
public class AMQConsumer implements BrowserListener {
+
private AMQSession session;
private org.apache.activemq.command.ActiveMQDestination actualDest;
private ConsumerInfo info;
@@ -72,7 +74,7 @@ public class AMQConsumer implements BrowserListener {
}
}
- public void init() throws Exception {
+ public void init(SlowConsumerDetectionListener slowConsumerDetectionListener) throws Exception {
AMQServerSession coreSession = session.getCoreSession();
SimpleString selector = info.getSelector() == null ? null : new SimpleString(info.getSelector());
@@ -127,7 +129,9 @@ public class AMQConsumer implements BrowserListener {
coreSession.createQueue(address, subQueueName, selector, true, false);
}
- coreSession.createConsumer(nativeId, subQueueName, null, info.isBrowser(), false, -1);
+ AMQServerConsumer serverConsumer = (AMQServerConsumer) coreSession.createConsumer(nativeId, subQueueName, null, info.isBrowser(), false, -1);
+ serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener);
+ serverConsumer.setAmqConsumer(this);
}
else {
SimpleString queueName = new SimpleString("jms.queue." + this.actualDest.getPhysicalName());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b0896b35/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducerBrokerExchange.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducerBrokerExchange.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducerBrokerExchange.java
index e9c4044..b5d8dbd 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducerBrokerExchange.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducerBrokerExchange.java
@@ -19,8 +19,6 @@ package org.apache.activemq.artemis.core.protocol.openwire.amq;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageId;
import org.apache.activemq.state.ProducerState;
public class AMQProducerBrokerExchange {
@@ -28,7 +26,6 @@ public class AMQProducerBrokerExchange {
private AMQConnectionContext connectionContext;
private ProducerState producerState;
private boolean mutable = true;
- private AtomicLong lastSendSequenceNumber = new AtomicLong(-1);
private final FlowControlInfo flowControlInfo = new FlowControlInfo();
public AMQProducerBrokerExchange() {
@@ -57,13 +54,6 @@ public class AMQProducerBrokerExchange {
}
/**
- * @return the mutable
- */
- public boolean isMutable() {
- return this.mutable;
- }
-
- /**
* @param mutable the mutable to set
*/
public void setMutable(boolean mutable) {
@@ -84,75 +74,13 @@ public class AMQProducerBrokerExchange {
this.producerState = producerState;
}
- /**
- * Enforce duplicate suppression using info from persistence adapter
- *
- * @return false if message should be ignored as a duplicate
- */
- public boolean canDispatch(Message messageSend) {
- // TODO: auditProduceSequenceIds is never true
- boolean canDispatch = true;
- //TODO: DEAD CODE
-// if (auditProducerSequenceIds && messageSend.isPersistent()) {
-// final long producerSequenceId = messageSend.getMessageId().getProducerSequenceId();
-// if (isNetworkProducer) {
-// // messages are multiplexed on this producer so we need to query the
-// // persistenceAdapter
-// long lastStoredForMessageProducer = getStoredSequenceIdForMessage(messageSend.getMessageId());
-// if (producerSequenceId <= lastStoredForMessageProducer) {
-// canDispatch = false;
-// }
-// }
-// else if (producerSequenceId <= lastSendSequenceNumber.get()) {
-// canDispatch = false;
-// // TODO: WHAT IS THIS?
-// if (messageSend.isInTransaction()) {
-//
-//
-// }
-// else {
-// }
-// }
-// else {
-// // track current so we can suppress duplicates later in the stream
-// lastSendSequenceNumber.set(producerSequenceId);
-// }
-// }
- return canDispatch;
- }
-
- private long getStoredSequenceIdForMessage(MessageId messageId) {
- return -1;
- }
-
public void setLastStoredSequenceId(long l) {
}
- public void incrementSend() {
- flowControlInfo.incrementSend();
- }
-
public void blockingOnFlowControl(boolean blockingOnFlowControl) {
flowControlInfo.setBlockingOnFlowControl(blockingOnFlowControl);
}
- public boolean isBlockedForFlowControl() {
- return flowControlInfo.isBlockingOnFlowControl();
- }
-
- public void resetFlowControl() {
- flowControlInfo.reset();
- }
-
- public long getTotalTimeBlocked() {
- return flowControlInfo.getTotalTimeBlocked();
- }
-
- public int getPercentageBlocked() {
- double value = flowControlInfo.getSendsBlocked() / flowControlInfo.getTotalSends();
- return (int) value * 100;
- }
-
public static class FlowControlInfo {
private AtomicBoolean blockingOnFlowControl = new AtomicBoolean();
@@ -160,10 +88,6 @@ public class AMQProducerBrokerExchange {
private AtomicLong sendsBlocked = new AtomicLong();
private AtomicLong totalTimeBlocked = new AtomicLong();
- public boolean isBlockingOnFlowControl() {
- return blockingOnFlowControl.get();
- }
-
public void setBlockingOnFlowControl(boolean blockingOnFlowControl) {
this.blockingOnFlowControl.set(blockingOnFlowControl);
if (blockingOnFlowControl) {
@@ -171,30 +95,10 @@ public class AMQProducerBrokerExchange {
}
}
- public long getTotalSends() {
- return totalSends.get();
- }
-
- public void incrementSend() {
- this.totalSends.incrementAndGet();
- }
-
- public long getSendsBlocked() {
- return sendsBlocked.get();
- }
-
public void incrementSendBlocked() {
this.sendsBlocked.incrementAndGet();
}
- public long getTotalTimeBlocked() {
- return totalTimeBlocked.get();
- }
-
- public void incrementTimeBlocked(long time) {
- this.totalTimeBlocked.addAndGet(time);
- }
-
public void reset() {
blockingOnFlowControl.set(false);
totalSends.set(0);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b0896b35/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java
index 3e7afa5..f198cb7 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java
@@ -34,6 +34,18 @@ import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
public class AMQServerConsumer extends ServerConsumerImpl {
+ // TODO-NOW: remove this once unified
+ AMQConsumer amqConsumer;
+
+ public AMQConsumer getAmqConsumer() {
+ return amqConsumer;
+ }
+
+ /** TODO-NOW: remove this once unified */
+ public void setAmqConsumer(AMQConsumer amqConsumer) {
+ this.amqConsumer = amqConsumer;
+ }
+
public AMQServerConsumer(long consumerID,
AMQServerSession serverSession,
QueueBinding binding,
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b0896b35/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index 0cee3d3..d16d4c8 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConnectionInfo;
@@ -71,8 +72,6 @@ public class AMQSession implements SessionCallback {
private Map<Long, AMQConsumer> consumers = new ConcurrentHashMap<>();
- private Map<Long, AMQProducer> producers = new HashMap<>();
-
private AtomicBoolean started = new AtomicBoolean(false);
private TransactionId txId = null;
@@ -121,7 +120,7 @@ public class AMQSession implements SessionCallback {
}
- public void createConsumer(ConsumerInfo info, AMQSession amqSession) throws Exception {
+ public void createConsumer(ConsumerInfo info, AMQSession amqSession, SlowConsumerDetectionListener slowConsumerDetectionListener) throws Exception {
//check destination
ActiveMQDestination dest = info.getDestination();
ActiveMQDestination[] dests = null;
@@ -139,7 +138,7 @@ public class AMQSession implements SessionCallback {
}
AMQConsumer consumer = new AMQConsumer(this, d, info, scheduledPool);
- consumer.init();
+ consumer.init(slowConsumerDetectionListener);
consumerMap.put(d, consumer);
consumers.put(consumer.getNativeId(), consumer);
}
@@ -233,20 +232,6 @@ public class AMQSession implements SessionCallback {
consumers.remove(consumerId);
}
- public void createProducer(ProducerInfo info) throws Exception {
- AMQProducer producer = new AMQProducer(this, info);
- producer.init();
- producers.put(info.getProducerId().getValue(), producer);
- }
-
- public void removeProducer(ProducerInfo info) {
- removeProducer(info.getProducerId());
- }
-
- public void removeProducer(ProducerId id) {
- producers.remove(id.getValue());
- }
-
public SendingResult send(AMQProducerBrokerExchange producerExchange,
Message messageSend,
boolean sendProducerAck) throws Exception {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b0896b35/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
index 6045e2c..d75efdd 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
@@ -25,6 +25,12 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
*/
public interface ServerConsumer extends Consumer {
+ void setlowConsumerDetection(SlowConsumerDetectionListener listener);
+
+ SlowConsumerDetectionListener getSlowConsumerDetecion();
+
+ void fireSlowConsumer();
+
/**
* @param protocolContext
* @see #getProtocolContext()
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b0896b35/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/SlowConsumerDetectionListener.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/SlowConsumerDetectionListener.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/SlowConsumerDetectionListener.java
new file mode 100644
index 0000000..0c60f25
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/SlowConsumerDetectionListener.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server;
+
+public interface SlowConsumerDetectionListener {
+ void onSlowConsumer(ServerConsumer consumer);
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b0896b35/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 8bf5d08..86ca36c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -2930,6 +2930,8 @@ public class QueueImpl implements Queue {
}
}
+ serverConsumer.fireSlowConsumer();
+
if (connection != null) {
ActiveMQServerLogger.LOGGER.slowConsumerDetected(serverConsumer.getSessionID(), serverConsumer.getID(), getName().toString(), connection.getRemoteAddress(), threshold, consumerRate);
if (policy.equals(SlowConsumerPolicy.KILL)) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b0896b35/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 422d324..545b4dc 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -50,6 +50,7 @@ import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.ServerSession;
+import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -88,8 +89,6 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
private Object protocolContext;
- private final ActiveMQServer server;
-
/**
* We get a readLock when a message is handled, and return the readLock when the message is finally delivered
* When stopping the consumer we need to get a writeLock to make sure we had all delivery finished
@@ -152,9 +151,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
final SessionCallback callback,
final boolean preAcknowledge,
final boolean strictUpdateDeliveryCount,
- final ManagementService managementService,
- final ActiveMQServer server) throws Exception {
- this(id, session, binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, true, null, server);
+ final ManagementService managementService) throws Exception {
+ this(id, session, binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, true, null);
}
public ServerConsumerImpl(final long id,
@@ -169,8 +167,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
final boolean strictUpdateDeliveryCount,
final ManagementService managementService,
final boolean supportLargeMessage,
- final Integer credits,
- final ActiveMQServer server) throws Exception {
+ final Integer credits) throws Exception {
this.id = id;
this.filter = filter;
@@ -215,8 +212,6 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
availableCredits.set(credits);
}
}
-
- this.server = server;
}
@Override
@@ -386,9 +381,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
}
finally {
lockDelivery.readLock().unlock();
- callback.afterDelivery();
}
-
}
@Override
@@ -569,19 +562,12 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
@Override
public void setStarted(final boolean started) {
synchronized (lock) {
- boolean locked = lockDelivery();
-
- // This is to make sure nothing would sneak to the client while started = false
- // the client will stop the session and perform a rollback in certain cases.
- // in case something sneaks to the client you could get to messaging delivering forever until
- // you restart the server
+ lockDelivery.writeLock().lock();
try {
this.started = browseOnly || started;
}
finally {
- if (locked) {
- lockDelivery.writeLock().unlock();
- }
+ lockDelivery.writeLock().unlock();
}
}
@@ -591,39 +577,22 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
}
}
- private boolean lockDelivery() {
- try {
- if (!lockDelivery.writeLock().tryLock(30, TimeUnit.SECONDS)) {
- ActiveMQServerLogger.LOGGER.timeoutLockingConsumer();
- if (server != null) {
- server.threadDump();
- }
- return false;
- }
- return true;
- }
- catch (Exception e) {
- ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
- return false;
- }
- }
-
@Override
public void setTransferring(final boolean transferring) {
synchronized (lock) {
- // This is to make sure that the delivery process has finished any pending delivery
- // otherwise a message may sneak in on the client while we are trying to stop the consumer
- boolean locked = lockDelivery();
- try {
- this.transferring = transferring;
- }
- finally {
- if (locked) {
- lockDelivery.writeLock().unlock();
- }
- }
+ this.transferring = transferring;
}
+ // This is to make sure that the delivery process has finished any pending delivery
+ // otherwise a message may sneak in on the client while we are trying to stop the consumer
+ try {
+ lockDelivery.writeLock().lock();
+ }
+ finally {
+ lockDelivery.writeLock().unlock();
+ }
+
+
// Outside the lock
if (transferring) {
// And we must wait for any force delivery to be executed - this is executed async so we add a future to the