You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2016/04/04 18:09:46 UTC
[37/42] activemq-artemis git commit: ARTEMIS-463 Refactoring on
Openwire https://issues.apache.org/jira/browse/ARTEMIS-463
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index f61705e..89f71ed 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -96,10 +96,11 @@ public class OpenWireMessageConverter implements MessageConverter {
private static final String AMQ_MSG_DROPPABLE = AMQ_PREFIX + "DROPPABLE";
private static final String AMQ_MSG_COMPRESSED = AMQ_PREFIX + "COMPRESSED";
- @Override
- public ServerMessage inbound(Object message) {
- // TODO: implement this
- return null;
+
+ private final WireFormat marshaller;
+
+ public OpenWireMessageConverter(WireFormat marshaller) {
+ this.marshaller = marshaller;
}
@Override
@@ -108,10 +109,13 @@ public class OpenWireMessageConverter implements MessageConverter {
return null;
}
- //convert an ActiveMQ Artemis message to coreMessage
- public static void toCoreMessage(ServerMessageImpl coreMessage,
- Message messageSend,
- WireFormat marshaller) throws IOException {
+
+ @Override
+ public ServerMessage inbound(Object message) throws Exception {
+
+ Message messageSend = (Message)message;
+ ServerMessageImpl coreMessage = new ServerMessageImpl(-1, messageSend.getSize());
+
String type = messageSend.getType();
if (type != null) {
coreMessage.putStringProperty(new SimpleString("JMSType"), new SimpleString(type));
@@ -391,6 +395,15 @@ public class OpenWireMessageConverter implements MessageConverter {
coreMessage.putStringProperty(AMQ_MSG_USER_ID, userId);
}
coreMessage.putBooleanProperty(AMQ_MSG_DROPPABLE, messageSend.isDroppable());
+
+ ActiveMQDestination origDest = messageSend.getOriginalDestination();
+ if (origDest != null) {
+ ByteSequence origDestBytes = marshaller.marshal(origDest);
+ origDestBytes.compact();
+ coreMessage.putBytesProperty(AMQ_MSG_ORIG_DESTINATION, origDestBytes.data);
+ }
+
+ return coreMessage;
}
private static void loadMapIntoProperties(TypedProperties props, Map<String, Object> map) {
@@ -430,7 +443,7 @@ public class OpenWireMessageConverter implements MessageConverter {
public static MessageDispatch createMessageDispatch(ServerMessage message,
int deliveryCount,
AMQConsumer consumer) throws IOException, JMSException {
- ActiveMQMessage amqMessage = toAMQMessage(message, consumer.getMarshaller(), consumer.getActualDestination());
+ ActiveMQMessage amqMessage = toAMQMessage(message, consumer.getMarshaller(), consumer.getOpenwireDestination());
MessageDispatch md = new MessageDispatch();
md.setConsumerId(consumer.getId());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index f916c8f..bbbb696 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -17,16 +17,13 @@
package org.apache.activemq.artemis.core.protocol.openwire;
import javax.jms.InvalidClientIDException;
-import java.util.ArrayList;
-import java.util.Collections;
+import javax.transaction.xa.XAException;
import java.util.HashMap;
-import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledExecutorService;
@@ -37,24 +34,16 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
-import org.apache.activemq.artemis.api.core.management.ManagementHelper;
-import org.apache.activemq.artemis.core.io.IOCallback;
-import org.apache.activemq.artemis.core.postoffice.Binding;
-import org.apache.activemq.artemis.core.postoffice.Bindings;
-import org.apache.activemq.artemis.core.postoffice.QueueBinding;
+import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener;
+import org.apache.activemq.artemis.api.core.client.TopologyMember;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
-import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerExchange;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
-import org.apache.activemq.artemis.core.security.CheckType;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.core.server.management.ManagementService;
-import org.apache.activemq.artemis.core.server.management.Notification;
-import org.apache.activemq.artemis.core.server.management.NotificationListener;
+import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
+import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
@@ -64,40 +53,31 @@ import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
import org.apache.activemq.artemis.utils.DataConstants;
-import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command;
-import org.apache.activemq.command.CommandTypes;
-import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionControl;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerId;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
-import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
-import org.apache.activemq.command.SessionId;
-import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.openwire.OpenWireFormatFactory;
-import org.apache.activemq.state.ConnectionState;
import org.apache.activemq.state.ProducerState;
-import org.apache.activemq.state.SessionState;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.InetAddressUtil;
import org.apache.activemq.util.LongSequenceGenerator;
-public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, NotificationListener {
+public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, ClusterTopologyListener {
private static final IdGenerator BROKER_ID_GENERATOR = new IdGenerator();
private static final IdGenerator ID_GENERATOR = new IdGenerator();
@@ -109,32 +89,36 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
private OpenWireFormatFactory wireFactory;
- private boolean tightEncodingEnabled = true;
-
private boolean prefixPacketSize = true;
private BrokerId brokerId;
protected final ProducerId advisoryProducerId = new ProducerId();
- // from broker
- protected final Map<ConnectionId, OpenWireConnection> brokerConnectionStates = Collections.synchronizedMap(new HashMap<ConnectionId, OpenWireConnection>());
-
private final CopyOnWriteArrayList<OpenWireConnection> connections = new CopyOnWriteArrayList<>();
- protected final ConcurrentMap<ConnectionId, ConnectionInfo> connectionInfos = new ConcurrentHashMap<>();
-
- private final Map<String, AMQConnectionContext> clientIdSet = new HashMap<>();
+ // TODO-NOW: this can probably go away
+ private final Map<String, AMQConnectionContext> clientIdSet = new HashMap<String, AMQConnectionContext>();
private String brokerName;
- private Map<SessionId, AMQSession> sessions = new ConcurrentHashMap<>();
-
+ // Clebert: Artemis already has a Resource Manager. Need to remove this..
+ // The TransactionID extends XATransactionID, so all we need is to convert the XID here
private Map<TransactionId, AMQSession> transactions = new ConcurrentHashMap<>();
- private Map<String, SessionId> sessionIdMap = new ConcurrentHashMap<>();
+ private final Map<String, TopologyMember> topologyMap = new ConcurrentHashMap<>();
+
+ private final LinkedList<TopologyMember> members = new LinkedList<>();
private final ScheduledExecutorService scheduledPool;
+ //bean properties
+ //http://activemq.apache.org/failover-transport-reference.html
+ private boolean rebalanceClusterClients = false;
+ private boolean updateClusterClients = false;
+ private boolean updateClusterClientsOnRemove = false;
+
+ private final OpenWireMessageConverter messageConverter;
+
public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer server) {
this.factory = factory;
this.server = server;
@@ -142,12 +126,82 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
// preferred prop, should be done via config
wireFactory.setCacheEnabled(false);
advisoryProducerId.setConnectionId(ID_GENERATOR.generateId());
- ManagementService service = server.getManagementService();
scheduledPool = server.getScheduledPool();
- if (service != null) {
- service.addNotificationListener(this);
+ this.messageConverter = new OpenWireMessageConverter(wireFactory.createWireFormat());
+
+ final ClusterManager clusterManager = this.server.getClusterManager();
+
+ // TODO-NOW: use a property name for the cluster connection
+ ClusterConnection cc = clusterManager.getDefaultConnection(null);
+
+ if (cc != null) {
+ cc.addClusterTopologyListener(this);
+ }
+ }
+
+ public OpenWireFormat getNewWireFormat() {
+ return (OpenWireFormat)wireFactory.createWireFormat();
+ }
+
+ @Override
+ public void nodeUP(TopologyMember member, boolean last) {
+ if (topologyMap.put(member.getNodeId(), member) == null) {
+ updateClientClusterInfo();
+ }
+ }
+
+ public void nodeDown(long eventUID, String nodeID) {
+ if (topologyMap.remove(nodeID) != null) {
+ updateClientClusterInfo();
+ }
+ }
+
+
+ public void removeConnection(ConnectionInfo info,
+ Throwable error) throws InvalidClientIDException {
+ synchronized (clientIdSet) {
+ String clientId = info.getClientId();
+ if (clientId != null) {
+ AMQConnectionContext context = this.clientIdSet.get(clientId);
+ if (context != null && context.decRefCount() == 0) {
+ //connection is still there and need to close
+ context.getConnection().disconnect(error != null);
+ this.connections.remove(this);//what's that for?
+ this.clientIdSet.remove(clientId);
+ }
+ }
+ else {
+ throw new InvalidClientIDException("No clientID specified for connection disconnect request");
+ }
+ }
+ }
+
+
+ public ScheduledExecutorService getScheduledPool() {
+ return scheduledPool;
+ }
+
+ public ActiveMQServer getServer() {
+ return server;
+ }
+
+ private void updateClientClusterInfo() {
+
+ synchronized (members) {
+ members.clear();
+ members.addAll(topologyMap.values());
}
+ for (OpenWireConnection c : this.connections) {
+ ConnectionControl control = newConnectionControl();
+ try {
+ c.updateClient(control);
+ }
+ catch (Exception e) {
+ ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
+ c.sendException(e);
+ }
+ }
}
@Override
@@ -169,20 +223,20 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
@Override
public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection connection) {
OpenWireFormat wf = (OpenWireFormat) wireFactory.createWireFormat();
- OpenWireConnection owConn = new OpenWireConnection(acceptorUsed, connection, this, wf);
- owConn.init();
+ OpenWireConnection owConn = new OpenWireConnection(connection, server.getExecutorFactory().getExecutor(), this, wf);
+ owConn.sendHandshake();
+ // TODO CLEBERT What is this constant here? we should get it from TTL initial pings
return new ConnectionEntry(owConn, null, System.currentTimeMillis(), 1 * 60 * 1000);
}
@Override
public MessageConverter getConverter() {
- return new OpenWireMessageConverter();
+ return messageConverter;
}
@Override
public void removeHandler(String name) {
- // TODO Auto-generated method stub
}
@Override
@@ -225,119 +279,60 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
@Override
public void handshake(NettyServerConnection connection, ActiveMQBuffer buffer) {
- // TODO Auto-generated method stub
-
- }
-
- public void handleCommand(OpenWireConnection openWireConnection, Object command) throws Exception {
- Command amqCmd = (Command) command;
- byte type = amqCmd.getDataStructureType();
- switch (type) {
- case CommandTypes.CONNECTION_INFO:
- break;
- case CommandTypes.CONNECTION_CONTROL:
- /** The ConnectionControl packet sent from client informs the broker that is capable of supporting dynamic
- * failover and load balancing. These features are not yet implemented for Artemis OpenWire. Instead we
- * simply drop the packet. See: ACTIVEMQ6-108 */
- break;
- case CommandTypes.MESSAGE_PULL:
- MessagePull messagePull = (MessagePull) amqCmd;
- openWireConnection.processMessagePull(messagePull);
- break;
- case CommandTypes.CONSUMER_CONTROL:
- break;
- default:
- throw new IllegalStateException("Cannot handle command: " + command);
- }
}
- public void sendReply(final OpenWireConnection connection, final Command command) {
- server.getStorageManager().afterCompleteOperations(new IOCallback() {
- @Override
- public void onError(final int errorCode, final String errorMessage) {
- ActiveMQServerLogger.LOGGER.errorProcessingIOCallback(errorCode, errorMessage);
- }
-
- @Override
- public void done() {
- send(connection, command);
- }
- });
- }
-
- public boolean send(final OpenWireConnection connection, final Command command) {
- if (ActiveMQServerLogger.LOGGER.isTraceEnabled()) {
- ActiveMQServerLogger.LOGGER.trace("sending " + command);
- }
- synchronized (connection) {
- if (connection.isDestroyed()) {
- return false;
- }
-
- try {
- connection.physicalSend(command);
- }
- catch (Exception e) {
- return false;
- }
- catch (Throwable t) {
- return false;
- }
- return true;
- }
- }
-
- public void addConnection(AMQConnectionContext context, ConnectionInfo info) throws Exception {
+ public void addConnection(OpenWireConnection connection, ConnectionInfo info) throws Exception {
String username = info.getUserName();
String password = info.getPassword();
if (!this.validateUser(username, password)) {
throw new SecurityException("User name [" + username + "] or password is invalid.");
}
+
String clientId = info.getClientId();
if (clientId == null) {
throw new InvalidClientIDException("No clientID specified for connection request");
}
+
synchronized (clientIdSet) {
- AMQConnectionContext oldContext = clientIdSet.get(clientId);
- if (oldContext != null) {
- if (context.isAllowLinkStealing()) {
- clientIdSet.remove(clientId);
- if (oldContext.getConnection() != null) {
- OpenWireConnection connection = oldContext.getConnection();
- connection.disconnect(true);
- }
- else {
- // log error
- }
+ AMQConnectionContext context;
+ context = clientIdSet.get(clientId);
+ if (context != null) {
+ if (info.isFailoverReconnect()) {
+ OpenWireConnection oldConnection = context.getConnection();
+ oldConnection.disconnect(true);
+ connections.remove(oldConnection);
+ connection.reconnect(context, info);
}
else {
- throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected from " + oldContext.getConnection().getRemoteAddress());
+ throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected from " + context.getConnection().getRemoteAddress());
}
}
else {
+ //new connection
+ context = connection.initContext(info);
clientIdSet.put(clientId, context);
}
- }
- connections.add(context.getConnection());
+ connections.add(connection);
- ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic();
- // do not distribute passwords in advisory messages. usernames okay
- ConnectionInfo copy = info.copy();
- copy.setPassword("");
- fireAdvisory(context, topic, copy);
- connectionInfos.put(copy.getConnectionId(), copy);
+ ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic();
+ // do not distribute passwords in advisory messages. usernames okay
+ ConnectionInfo copy = info.copy();
+ copy.setPassword("");
+ fireAdvisory(context, topic, copy);
- // init the conn
- addSessions(context.getConnection(), context.getConnectionState().getSessionIds());
+ // init the conn
+ context.getConnection().addSessions( context.getConnectionState().getSessionIds());
+ }
}
- private void fireAdvisory(AMQConnectionContext context, ActiveMQTopic topic, Command copy) throws Exception {
+ public void fireAdvisory(AMQConnectionContext context, ActiveMQTopic topic, Command copy) throws Exception {
this.fireAdvisory(context, topic, copy, null);
}
public BrokerId getBrokerId() {
+ // TODO: Use the Storage ID here...
if (brokerId == null) {
brokerId = new BrokerId(BROKER_ID_GENERATOR.generateId());
}
@@ -347,7 +342,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
/*
* See AdvisoryBroker.fireAdvisory()
*/
- private void fireAdvisory(AMQConnectionContext context,
+ public void fireAdvisory(AMQConnectionContext context,
ActiveMQTopic topic,
Command command,
ConsumerId targetConsumerId) throws Exception {
@@ -372,13 +367,12 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
boolean originalFlowControl = context.isProducerFlowControl();
final AMQProducerBrokerExchange producerExchange = new AMQProducerBrokerExchange();
producerExchange.setConnectionContext(context);
- producerExchange.setMutable(true);
producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
try {
context.setProducerFlowControl(false);
AMQSession sess = context.getConnection().getAdvisorySession();
if (sess != null) {
- sess.send(producerExchange, advisoryMessage, false);
+ sess.send(producerExchange.getProducerState().getInfo(), advisoryMessage, false);
}
}
finally {
@@ -392,220 +386,68 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
brokerName = InetAddressUtil.getLocalHostName().toLowerCase(Locale.ENGLISH);
}
catch (Exception e) {
- brokerName = "localhost";
+ brokerName = server.getNodeID().toString();
}
}
return brokerName;
}
- public boolean isFaultTolerantConfiguration() {
- return false;
- }
-
- public void postProcessDispatch(MessageDispatch md) {
- // TODO Auto-generated method stub
-
- }
-
- public boolean isStopped() {
- // TODO Auto-generated method stub
- return false;
- }
-
- public void preProcessDispatch(MessageDispatch messageDispatch) {
- // TODO Auto-generated method stub
+ protected ConnectionControl newConnectionControl() {
+ ConnectionControl control = new ConnectionControl();
- }
+ String uri = generateMembersURI(rebalanceClusterClients);
+ control.setConnectedBrokers(uri);
- public boolean isStopping() {
- return false;
+ control.setRebalanceConnection(rebalanceClusterClients);
+ return control;
}
- public void addProducer(OpenWireConnection theConn, ProducerInfo info) throws Exception {
- SessionId sessionId = info.getProducerId().getParentId();
- ConnectionId connectionId = sessionId.getParentId();
- ConnectionState cs = theConn.getState();
- if (cs == null) {
- throw new IllegalStateException("Cannot add a producer to a connection that had not been registered: " + connectionId);
- }
- SessionState ss = cs.getSessionState(sessionId);
- if (ss == null) {
- throw new IllegalStateException("Cannot add a producer to a session that had not been registered: " + sessionId);
- }
- // Avoid replaying dup commands
- if (!ss.getProducerIds().contains(info.getProducerId())) {
-
- AMQSession amqSession = sessions.get(sessionId);
- if (amqSession == null) {
- throw new IllegalStateException("Session not exist! : " + sessionId);
- }
+ private String generateMembersURI(boolean flip) {
+ String uri;
+ StringBuffer connectedBrokers = new StringBuffer();
+ String separator = "";
- ActiveMQDestination destination = info.getDestination();
- if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) {
- if (theConn.getProducerCount() >= theConn.getMaximumProducersAllowedPerConnection()) {
- throw new IllegalStateException("Can't add producer on connection " + connectionId + ": at maximum limit: " + theConn.getMaximumProducersAllowedPerConnection());
- }
- if (destination.isQueue()) {
- OpenWireUtil.validateDestination(destination, amqSession);
+ synchronized (members) {
+ if (members.size() > 0) {
+ for (TopologyMember member : members) {
+ connectedBrokers.append(separator).append(member.toURI());
+ separator = ",";
}
- DestinationInfo destInfo = new DestinationInfo(theConn.getConext().getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, destination);
- this.addDestination(theConn, destInfo);
- }
-
- amqSession.createProducer(info);
-
- try {
- ss.addProducer(info);
- }
- catch (IllegalStateException e) {
- amqSession.removeProducer(info);
- }
-
- }
-
- }
-
- public void addConsumer(OpenWireConnection theConn, ConsumerInfo info) throws Exception {
- // Todo: add a destination interceptors holder here (amq supports this)
- SessionId sessionId = info.getConsumerId().getParentId();
- ConnectionId connectionId = sessionId.getParentId();
- ConnectionState cs = theConn.getState();
- if (cs == null) {
- throw new IllegalStateException("Cannot add a consumer to a connection that had not been registered: " + connectionId);
- }
- SessionState ss = cs.getSessionState(sessionId);
- if (ss == null) {
- throw new IllegalStateException(this.server + " Cannot add a consumer to a session that had not been registered: " + sessionId);
- }
- // Avoid replaying dup commands
- if (!ss.getConsumerIds().contains(info.getConsumerId())) {
- ActiveMQDestination destination = info.getDestination();
- if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) {
- if (theConn.getConsumerCount() >= theConn.getMaximumConsumersAllowedPerConnection()) {
- throw new IllegalStateException("Can't add consumer on connection " + connectionId + ": at maximum limit: " + theConn.getMaximumConsumersAllowedPerConnection());
+ // The flip exists to guarantee even distribution of URIs when sent to the client
+ // in case of failures you won't get all the connections failing to a single server.
+ if (flip && members.size() > 1) {
+ members.addLast(members.removeFirst());
}
}
-
- AMQSession amqSession = sessions.get(sessionId);
- if (amqSession == null) {
- throw new IllegalStateException("Session not exist! : " + sessionId);
- }
-
- amqSession.createConsumer(info, amqSession);
-
- ss.addConsumer(info);
- }
- }
-
- public void addSessions(OpenWireConnection theConn, Set<SessionId> sessionSet) {
- Iterator<SessionId> iter = sessionSet.iterator();
- while (iter.hasNext()) {
- SessionId sid = iter.next();
- addSession(theConn, theConn.getState().getSessionState(sid).getInfo(), true);
}
- }
-
- public AMQSession addSession(OpenWireConnection theConn, SessionInfo ss) {
- return addSession(theConn, ss, false);
- }
- public AMQSession addSession(OpenWireConnection theConn, SessionInfo ss, boolean internal) {
- AMQSession amqSession = new AMQSession(theConn.getState().getInfo(), ss, server, theConn, scheduledPool, this);
- amqSession.initialize();
- amqSession.setInternal(internal);
- sessions.put(ss.getSessionId(), amqSession);
- sessionIdMap.put(amqSession.getCoreSession().getName(), ss.getSessionId());
- return amqSession;
+ uri = connectedBrokers.toString();
+ return uri;
}
- public void removeConnection(AMQConnectionContext context, ConnectionInfo info, Throwable error) {
- // todo roll back tx
- this.connections.remove(context.getConnection());
- this.connectionInfos.remove(info.getConnectionId());
- String clientId = info.getClientId();
- if (clientId != null) {
- this.clientIdSet.remove(clientId);
- }
+ public boolean isFaultTolerantConfiguration() {
+ return false;
}
- public void removeSession(AMQConnectionContext context, SessionInfo info) throws Exception {
- AMQSession session = sessions.remove(info.getSessionId());
- if (session != null) {
- session.close();
- }
- }
+ public void postProcessDispatch(MessageDispatch md) {
+ // TODO Auto-generated method stub
- public void removeProducer(ProducerId id) {
- SessionId sessionId = id.getParentId();
- AMQSession session = sessions.get(sessionId);
- session.removeProducer(id);
}
- public AMQSession getSession(SessionId sessionId) {
- return sessions.get(sessionId);
+ public boolean isStopped() {
+ // TODO Auto-generated method stub
+ return false;
}
- public void removeDestination(OpenWireConnection connection, ActiveMQDestination dest) throws Exception {
- if (dest.isQueue()) {
- SimpleString qName = new SimpleString("jms.queue." + dest.getPhysicalName());
- this.server.destroyQueue(qName);
- }
- else {
- Bindings bindings = this.server.getPostOffice().getBindingsForAddress(SimpleString.toSimpleString("jms.topic." + dest.getPhysicalName()));
- Iterator<Binding> iterator = bindings.getBindings().iterator();
-
- while (iterator.hasNext()) {
- Queue b = (Queue) iterator.next().getBindable();
- if (b.getConsumerCount() > 0) {
- throw new Exception("Destination still has an active subscription: " + dest.getPhysicalName());
- }
- if (b.isDurable()) {
- throw new Exception("Destination still has durable subscription: " + dest.getPhysicalName());
- }
- b.deleteQueue();
- }
- }
-
- if (!AdvisorySupport.isAdvisoryTopic(dest)) {
- AMQConnectionContext context = connection.getConext();
- DestinationInfo advInfo = new DestinationInfo(context.getConnectionId(), DestinationInfo.REMOVE_OPERATION_TYPE, dest);
+ public void preProcessDispatch(MessageDispatch messageDispatch) {
+ // TODO Auto-generated method stub
- ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(dest);
- fireAdvisory(context, topic, advInfo);
- }
}
- public void addDestination(OpenWireConnection connection, DestinationInfo info) throws Exception {
- ActiveMQDestination dest = info.getDestination();
- if (dest.isQueue()) {
- SimpleString qName = OpenWireUtil.toCoreAddress(dest);
- QueueBinding binding = (QueueBinding) server.getPostOffice().getBinding(qName);
- if (binding == null) {
- if (connection.getState().getInfo() != null) {
-
- CheckType checkType = dest.isTemporary() ? CheckType.CREATE_NON_DURABLE_QUEUE : CheckType.CREATE_DURABLE_QUEUE;
- server.getSecurityStore().check(qName, checkType, connection);
-
- server.checkQueueCreationLimit(connection.getUsername());
- }
- ConnectionInfo connInfo = connection.getState().getInfo();
- this.server.createQueue(qName, qName, null, connInfo == null ? null : SimpleString.toSimpleString(connInfo.getUserName()), false, dest.isTemporary());
- }
- if (dest.isTemporary()) {
- connection.registerTempQueue(dest);
- }
- }
-
- if (!AdvisorySupport.isAdvisoryTopic(dest)) {
- AMQConnectionContext context = connection.getConext();
- DestinationInfo advInfo = new DestinationInfo(context.getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, dest);
-
- ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(dest);
- fireAdvisory(context, topic, advInfo);
- }
+ public boolean isStopping() {
+ return false;
}
-
public void endTransaction(TransactionInfo info) throws Exception {
AMQSession txSession = transactions.get(info.getTransactionId());
@@ -645,20 +487,15 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
if (txSession != null) {
txSession.rollback(info);
}
- transactions.remove(info.getTransactionId());
- }
-
- public TransactionId[] recoverTransactions(Set<SessionId> sIds) {
- List<TransactionId> recovered = new ArrayList<>();
- if (sIds != null) {
- for (SessionId sid : sIds) {
- AMQSession s = this.sessions.get(sid);
- if (s != null) {
- s.recover(recovered);
- }
- }
+ else if (info.getTransactionId().isLocalTransaction()) {
+ //during a broker restart, recovered local transaction may not be registered
+ //in that case we ignore and let the tx removed silently by connection.
+ //see AMQ1925Test.testAMQ1925_TXBegin
+ }
+ else {
+ throw newXAException("Transaction '" + info.getTransactionId() + "' has not been started.", XAException.XAER_NOTA);
}
- return recovered.toArray(new TransactionId[0]);
+ transactions.remove(info.getTransactionId());
}
public boolean validateUser(String login, String passcode) {
@@ -681,64 +518,63 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
transactions.remove(xid);
}
+ /**
+ * TODO: remove this, use the regular ResourceManager from the Server's
+ */
public void registerTx(TransactionId txId, AMQSession amqSession) {
transactions.put(txId, amqSession);
}
- //advisory support
- @Override
- public void onNotification(Notification notif) {
- try {
- if (notif.getType() instanceof CoreNotificationType) {
- CoreNotificationType type = (CoreNotificationType) notif.getType();
- switch (type) {
- case CONSUMER_SLOW:
- fireSlowConsumer(notif);
- break;
- default:
- break;
- }
- }
- }
- catch (Exception e) {
- ActiveMQServerLogger.LOGGER.error("Failed to send notification " + notif, e);
- }
- }
-
- private void fireSlowConsumer(Notification notif) throws Exception {
- SimpleString coreSessionId = notif.getProperties().getSimpleStringProperty(ManagementHelper.HDR_SESSION_NAME);
- Long coreConsumerId = notif.getProperties().getLongProperty(ManagementHelper.HDR_CONSUMER_NAME);
- SessionId sessionId = sessionIdMap.get(coreSessionId.toString());
- AMQSession session = sessions.get(sessionId);
- AMQConsumer consumer = session.getConsumer(coreConsumerId);
- ActiveMQDestination destination = consumer.getDestination();
-
- if (!AdvisorySupport.isAdvisoryTopic(destination)) {
- ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(destination);
- ConnectionId connId = sessionId.getParentId();
- OpenWireConnection cc = this.brokerConnectionStates.get(connId);
- ActiveMQMessage advisoryMessage = new ActiveMQMessage();
- advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, consumer.getId().toString());
-
- fireAdvisory(cc.getConext(), topic, advisoryMessage, consumer.getId());
- }
- }
-
public void removeSubscription(RemoveSubscriptionInfo subInfo) throws Exception {
SimpleString subQueueName = new SimpleString(org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForDurableSubscription(true, subInfo.getClientId(), subInfo.getSubscriptionName()));
server.destroyQueue(subQueueName);
}
- public void sendBrokerInfo(OpenWireConnection connection) {
+ public void sendBrokerInfo(OpenWireConnection connection) throws Exception {
BrokerInfo brokerInfo = new BrokerInfo();
- brokerInfo.setBrokerName(server.getIdentity());
- brokerInfo.setBrokerId(new BrokerId(server.getNodeID().toString()));
+ brokerInfo.setBrokerName(getBrokerName());
+ brokerInfo.setBrokerId(new BrokerId("" + server.getNodeID()));
brokerInfo.setPeerBrokerInfos(null);
brokerInfo.setFaultTolerantConfiguration(false);
brokerInfo.setBrokerURL(connection.getLocalAddress());
//cluster support yet to support
brokerInfo.setPeerBrokerInfos(null);
- connection.dispatchAsync(brokerInfo);
+ connection.dispatch(brokerInfo);
+ }
+
+ public void setRebalanceClusterClients(boolean rebalance) {
+ this.rebalanceClusterClients = rebalance;
+ }
+
+ public boolean isRebalanceClusterClients() {
+ return this.rebalanceClusterClients;
+ }
+
+ public void setUpdateClusterClients(boolean updateClusterClients) {
+ this.updateClusterClients = updateClusterClients;
}
+
+ public boolean isUpdateClusterClients() {
+ return this.updateClusterClients;
+ }
+
+ public void setUpdateClusterClientsOnRemove(boolean updateClusterClientsOnRemove) {
+ this.updateClusterClientsOnRemove = updateClusterClientsOnRemove;
+ }
+
+ public boolean isUpdateClusterClientsOnRemove() {
+ return this.updateClusterClientsOnRemove;
+ }
+
+ public void setBrokerName(String name) {
+ this.brokerName = name;
+ }
+
+ public static XAException newXAException(String s, int errorCode) {
+ XAException xaException = new XAException(s + " " + "xaErrorCode:" + errorCode);
+ xaException.errorCode = errorCode;
+ return xaException;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireUtil.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireUtil.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireUtil.java
index d684761..4513eb3 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireUtil.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireUtil.java
@@ -18,16 +18,12 @@ package org.apache.activemq.artemis.core.protocol.openwire;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQServerSession;
-import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
-import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
-import org.apache.activemq.artemis.core.server.BindingQueryResult;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.ByteSequence;
-import org.apache.activemq.artemis.api.core.SimpleString;
public class OpenWireUtil {
@@ -64,23 +60,6 @@ public class OpenWireUtil {
}
}
- /**
- * Checks to see if this destination exists. If it does not throw an invalid destination exception.
- *
- * @param destination
- * @param amqSession
- */
- public static void validateDestination(ActiveMQDestination destination, AMQSession amqSession) throws Exception {
- if (destination.isQueue()) {
- AMQServerSession coreSession = amqSession.getCoreSession();
- SimpleString physicalName = OpenWireUtil.toCoreAddress(destination);
- BindingQueryResult result = coreSession.executeBindingQuery(physicalName);
- if (!result.isExists() && !result.isAutoCreateJmsQueues()) {
- throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(physicalName);
- }
- }
- }
-
/*
*This util converts amq wildcards to compatible core wildcards
*The conversion is like this:
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/SendingResult.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/SendingResult.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/SendingResult.java
deleted file mode 100644
index 0e21ca4..0000000
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/SendingResult.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.protocol.openwire;
-
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.paging.impl.PagingStoreImpl;
-import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
-
-public class SendingResult {
-
- private boolean blockNextSend;
- private PagingStoreImpl blockPagingStore;
- private SimpleString blockingAddress;
-
- public void setBlockNextSend(boolean block) {
- this.blockNextSend = block;
- }
-
- public boolean isBlockNextSend() {
- return this.blockNextSend;
- }
-
- public void setBlockPagingStore(PagingStoreImpl store) {
- this.blockPagingStore = store;
- }
-
- public PagingStoreImpl getBlockPagingStore() {
- return this.blockPagingStore;
- }
-
- public void setBlockingAddress(SimpleString address) {
- this.blockingAddress = address;
- }
-
- public SimpleString getBlockingAddress() {
- return this.blockingAddress;
- }
-
- public boolean isSendFailIfNoSpace() {
- AddressFullMessagePolicy policy = this.blockPagingStore.getAddressFullMessagePolicy();
- return policy == AddressFullMessagePolicy.FAIL;
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQCompositeConsumerBrokerExchange.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQCompositeConsumerBrokerExchange.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQCompositeConsumerBrokerExchange.java
index 7e83767..56b4b6d 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQCompositeConsumerBrokerExchange.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQCompositeConsumerBrokerExchange.java
@@ -20,15 +20,20 @@ import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessagePull;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
public class AMQCompositeConsumerBrokerExchange extends AMQConsumerBrokerExchange {
private final Map<ActiveMQDestination, AMQConsumer> consumerMap;
- public AMQCompositeConsumerBrokerExchange(AMQSession amqSession, Map<ActiveMQDestination, AMQConsumer> consumerMap) {
+ public AMQCompositeConsumerBrokerExchange(AMQSession amqSession, List<AMQConsumer> consumerList) {
super(amqSession);
- this.consumerMap = consumerMap;
+ this.consumerMap = new HashMap<>();
+ for (AMQConsumer consumer : consumerList) {
+ consumerMap.put(consumer.getOpenwireDestination(), consumer);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConnectionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConnectionContext.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConnectionContext.java
index a79911c..8071d04 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConnectionContext.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConnectionContext.java
@@ -17,9 +17,11 @@
package org.apache.activemq.artemis.core.protocol.openwire.amq;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager;
+import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.WireFormatInfo;
@@ -47,6 +49,8 @@ public class AMQConnectionContext {
private boolean clientMaster = true;
private ConnectionState connectionState;
private XATransactionId xid;
+ private AtomicInteger refCount = new AtomicInteger(1);
+ private Command lastCommand;
public AMQConnectionContext() {
this.messageEvaluationContext = new MessageEvaluationContext();
@@ -248,4 +252,19 @@ public class AMQConnectionContext {
return false;
}
+ public void incRefCount() {
+ refCount.incrementAndGet();
+ }
+
+ public int decRefCount() {
+ return refCount.decrementAndGet();
+ }
+
+ public void setLastCommand(Command lastCommand) {
+ this.lastCommand = lastCommand;
+ }
+
+ public Command getLastCommand() {
+ return this.lastCommand;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index 7da1f3e..ef9b2a8 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -27,7 +27,15 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
+import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
+import org.apache.activemq.artemis.core.protocol.openwire.OpenWireUtil;
+import org.apache.activemq.artemis.core.server.QueueQueryResult;
+import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.MessageAck;
@@ -36,23 +44,15 @@ import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.wireformat.WireFormat;
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
-import org.apache.activemq.artemis.core.protocol.openwire.OpenWireUtil;
-import org.apache.activemq.artemis.core.server.QueueQueryResult;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
-
-public class AMQConsumer implements BrowserListener {
+public class AMQConsumer {
private AMQSession session;
- private org.apache.activemq.command.ActiveMQDestination actualDest;
+ private org.apache.activemq.command.ActiveMQDestination openwireDestination;
private ConsumerInfo info;
private final ScheduledExecutorService scheduledPool;
private long nativeId = -1;
- private SimpleString subQueueName = null;
- private final int prefetchSize;
+ private int prefetchSize;
private AtomicInteger windowAvailable;
private final java.util.Queue<MessageInfo> deliveringRefs = new ConcurrentLinkedQueue<>();
private long messagePullSequence = 0;
@@ -63,7 +63,7 @@ public class AMQConsumer implements BrowserListener {
ConsumerInfo info,
ScheduledExecutorService scheduledPool) {
this.session = amqSession;
- this.actualDest = d;
+ this.openwireDestination = d;
this.info = info;
this.scheduledPool = scheduledPool;
this.prefetchSize = info.getPrefetchSize();
@@ -73,75 +73,102 @@ public class AMQConsumer implements BrowserListener {
}
}
- public void init() throws Exception {
- AMQServerSession coreSession = session.getCoreSession();
+ public void init(SlowConsumerDetectionListener slowConsumerDetectionListener, long nativeId) throws Exception {
+ this.nativeId = nativeId;
+ AMQServerConsumer serverConsumer = createServerConsumer(info, slowConsumerDetectionListener);
+ serverConsumer.setAmqConsumer(this);
+ }
+
+
+ private AMQServerConsumer createServerConsumer(ConsumerInfo info, SlowConsumerDetectionListener slowConsumerDetectionListener) throws Exception {
SimpleString selector = info.getSelector() == null ? null : new SimpleString(info.getSelector());
- nativeId = session.getCoreServer().getStorageManager().generateID();
+ String physicalName = OpenWireUtil.convertWildcard(openwireDestination.getPhysicalName());
+
+ SimpleString address;
+
+ if (openwireDestination.isTopic()) {
+ address = new SimpleString("jms.topic." + physicalName);
- SimpleString address = new SimpleString(this.actualDest.getPhysicalName());
+ SimpleString queueName = createTopicSubscription(info.isDurable(), info.getClientId(), physicalName, info.getSubscriptionName(), selector, address);
- if (this.actualDest.isTopic()) {
- String physicalName = this.actualDest.getPhysicalName();
- if (physicalName.contains(".>")) {
- //wildcard
- physicalName = OpenWireUtil.convertWildcard(physicalName);
+ AMQServerConsumer serverConsumer = (AMQServerConsumer) session.getCoreSession().createConsumer(nativeId, queueName, null, info.isBrowser(), false, -1);
+ serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener);
+ return serverConsumer;
+ }
+ else {
+ SimpleString queueName = new SimpleString("jms.queue." + physicalName);
+ AMQServerConsumer serverConsumer = (AMQServerConsumer) session.getCoreSession().createConsumer(nativeId, queueName, selector, info.isBrowser(), false, -1);
+ serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener);
+ AddressSettings addrSettings = session.getCoreServer().getAddressSettingsRepository().getMatch(queueName.toString());
+ if (addrSettings != null) {
+ //see PolicyEntry
+ if (info.getPrefetchSize() != 0 && addrSettings.getQueuePrefetch() == 0) {
+ //sends back a ConsumerControl
+ ConsumerControl cc = new ConsumerControl();
+ cc.setConsumerId(info.getConsumerId());
+ cc.setPrefetch(0);
+ session.getConnection().dispatch(cc);
+ }
}
- // on recreate we don't need to create queues
- address = new SimpleString("jms.topic." + physicalName);
- if (info.isDurable()) {
- subQueueName = new SimpleString(ActiveMQDestination.createQueueNameForDurableSubscription(true, info.getClientId(), info.getSubscriptionName()));
-
- QueueQueryResult result = coreSession.executeQueueQuery(subQueueName);
- if (result.isExists()) {
- // Already exists
- if (result.getConsumerCount() > 0) {
- throw new IllegalStateException("Cannot create a subscriber on the durable subscription since it already has subscriber(s)");
- }
+ return serverConsumer;
- SimpleString oldFilterString = result.getFilterString();
+ }
- boolean selectorChanged = selector == null && oldFilterString != null || oldFilterString == null && selector != null || oldFilterString != null && selector != null && !oldFilterString.equals(selector);
+ }
- SimpleString oldTopicName = result.getAddress();
+ private SimpleString createTopicSubscription(boolean isDurable,
+ String clientID,
+ String physicalName,
+ String subscriptionName,
+ SimpleString selector,
+ SimpleString address) throws Exception {
+
+ SimpleString queueName;
+
+ if (isDurable) {
+ queueName = new SimpleString(org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForDurableSubscription(true, clientID, subscriptionName));
+ QueueQueryResult result = session.getCoreSession().executeQueueQuery(queueName);
+ if (result.isExists()) {
+ // Already exists
+ if (result.getConsumerCount() > 0) {
+ throw new IllegalStateException("Cannot create a subscriber on the durable subscription since it already has subscriber(s)");
+ }
- boolean topicChanged = !oldTopicName.equals(address);
+ SimpleString oldFilterString = result.getFilterString();
- if (selectorChanged || topicChanged) {
- // Delete the old durable sub
- coreSession.deleteQueue(subQueueName);
+ boolean selectorChanged = selector == null && oldFilterString != null || oldFilterString == null && selector != null || oldFilterString != null && selector != null && !oldFilterString.equals(selector);
- // Create the new one
- coreSession.createQueue(address, subQueueName, selector, false, true);
- }
+ SimpleString oldTopicName = result.getAddress();
- }
- else {
- coreSession.createQueue(address, subQueueName, selector, false, true);
+ boolean topicChanged = !oldTopicName.equals(address);
+
+ if (selectorChanged || topicChanged) {
+ // Delete the old durable sub
+ session.getCoreSession().deleteQueue(queueName);
+
+ // Create the new one
+ session.getCoreSession().createQueue(address, queueName, selector, false, true);
}
}
else {
- subQueueName = new SimpleString(UUID.randomUUID().toString());
-
- coreSession.createQueue(address, subQueueName, selector, true, false);
+ session.getCoreSession().createQueue(address, queueName, selector, false, true);
}
-
- coreSession.createConsumer(nativeId, subQueueName, null, info.isBrowser(), false, -1);
}
else {
- SimpleString queueName = new SimpleString("jms.queue." + this.actualDest.getPhysicalName());
- coreSession.createConsumer(nativeId, queueName, selector, info.isBrowser(), false, -1);
- }
+ queueName = new SimpleString(UUID.randomUUID().toString());
+
+ session.getCoreSession().createQueue(address, queueName, selector, true, false);
- if (info.isBrowser()) {
- AMQServerConsumer coreConsumer = coreSession.getConsumer(nativeId);
- coreConsumer.setBrowserListener(this);
}
+ return queueName;
}
+
+
public long getNativeId() {
return this.nativeId;
}
@@ -189,7 +216,7 @@ public class AMQConsumer implements BrowserListener {
public void handleDeliverNullDispatch() {
MessageDispatch md = new MessageDispatch();
md.setConsumerId(getId());
- md.setDestination(actualDest);
+ md.setDestination(openwireDestination);
session.deliverMessage(md);
windowAvailable.decrementAndGet();
}
@@ -210,9 +237,16 @@ public class AMQConsumer implements BrowserListener {
mi = iter.next();
if (mi.amqId.equals(lastm)) {
n++;
- iter.remove();
- session.getCoreSession().individualAcknowledge(nativeId, mi.nativeId);
- session.getCoreSession().commit();
+ if (!isLocalTx) {
+ iter.remove();
+ session.getCoreSession().individualAcknowledge(nativeId, mi.nativeId);
+ }
+ else {
+ mi.setLocalAcked(true);
+ }
+ if (tid == null) {
+ session.getCoreSession().commit();
+ }
break;
}
}
@@ -220,7 +254,7 @@ public class AMQConsumer implements BrowserListener {
else if (ack.isRedeliveredAck()) {
//client tells that this message is for redlivery.
//do nothing until poisoned.
- n = 1;
+ n = ack.getMessageCount();
}
else if (ack.isPoisonAck()) {
//send to dlq
@@ -251,7 +285,7 @@ public class AMQConsumer implements BrowserListener {
}
else if (ack.isDeliveredAck() || ack.isExpiredAck()) {
//ToDo: implement with tests
- n = 1;
+ n = ack.getMessageCount();
}
else {
Iterator<MessageInfo> iter = deliveringRefs.iterator();
@@ -294,7 +328,6 @@ public class AMQConsumer implements BrowserListener {
acquireCredit(n);
}
- @Override
public void browseFinished() {
MessageDispatch md = new MessageDispatch();
md.setConsumerId(info.getConsumerId());
@@ -304,11 +337,6 @@ public class AMQConsumer implements BrowserListener {
session.deliverMessage(md);
}
- public boolean handledTransactionalMsg() {
- // TODO Auto-generated method stub
- return false;
- }
-
//this is called before session commit a local tx
public void finishTx() throws Exception {
MessageInfo lastMi = null;
@@ -346,10 +374,6 @@ public class AMQConsumer implements BrowserListener {
}
}
- public org.apache.activemq.command.ActiveMQDestination getDestination() {
- return actualDest;
- }
-
public ConsumerInfo getInfo() {
return info;
}
@@ -370,10 +394,22 @@ public class AMQConsumer implements BrowserListener {
session.removeConsumer(nativeId);
}
- public org.apache.activemq.command.ActiveMQDestination getActualDestination() {
- return actualDest;
+ public org.apache.activemq.command.ActiveMQDestination getOpenwireDestination() {
+ return openwireDestination;
+ }
+
+ public void setPrefetchSize(int prefetchSize) {
+ this.prefetchSize = prefetchSize;
+ this.windowAvailable.set(prefetchSize);
+ this.info.setPrefetchSize(prefetchSize);
+ if (this.prefetchSize > 0) {
+ session.getCoreSession().promptDelivery(nativeId);
+ }
}
+ /**
+ * The MessagePullHandler is used with slow consumer policies.
+ * */
private class MessagePullHandler {
private long next = -1;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java
index 800ee3f..21a45b1 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerBrokerExchange.java
@@ -22,41 +22,11 @@ import org.apache.activemq.command.MessagePull;
public abstract class AMQConsumerBrokerExchange {
protected final AMQSession amqSession;
- private AMQConnectionContext connectionContext;
- private boolean wildcard;
public AMQConsumerBrokerExchange(AMQSession amqSession) {
this.amqSession = amqSession;
}
- /**
- * @return the connectionContext
- */
- public AMQConnectionContext getConnectionContext() {
- return this.connectionContext;
- }
-
- /**
- * @param connectionContext the connectionContext to set
- */
- public void setConnectionContext(AMQConnectionContext connectionContext) {
- this.connectionContext = connectionContext;
- }
-
- /**
- * @return the wildcard
- */
- public boolean isWildcard() {
- return this.wildcard;
- }
-
- /**
- * @param wildcard the wildcard to set
- */
- public void setWildcard(boolean wildcard) {
- this.wildcard = wildcard;
- }
-
public abstract void acknowledge(MessageAck ack) throws Exception;
public abstract void processMessagePull(MessagePull messagePull) throws Exception;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducer.java
deleted file mode 100644
index 848325e..0000000
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducer.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.protocol.openwire.amq;
-
-import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.artemis.core.protocol.openwire.OpenWireUtil;
-
-public class AMQProducer {
-
- private AMQSession amqSession;
- private ProducerInfo info;
-
- public AMQProducer(AMQSession amqSession, ProducerInfo info) {
- this.amqSession = amqSession;
- this.info = info;
- }
-
- public void init() throws Exception {
- // If the destination is specified check that it exists.
- if (info.getDestination() != null) {
- OpenWireUtil.validateDestination(info.getDestination(), amqSession);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducerBrokerExchange.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducerBrokerExchange.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducerBrokerExchange.java
index f94c119..220c7fc 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducerBrokerExchange.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQProducerBrokerExchange.java
@@ -16,34 +16,16 @@
*/
package org.apache.activemq.artemis.core.protocol.openwire.amq;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageId;
import org.apache.activemq.state.ProducerState;
public class AMQProducerBrokerExchange {
private AMQConnectionContext connectionContext;
private ProducerState producerState;
- private boolean mutable = true;
- private AtomicLong lastSendSequenceNumber = new AtomicLong(-1);
- private boolean auditProducerSequenceIds;
- private boolean isNetworkProducer;
- private final FlowControlInfo flowControlInfo = new FlowControlInfo();
public AMQProducerBrokerExchange() {
}
- public AMQProducerBrokerExchange copy() {
- AMQProducerBrokerExchange rc = new AMQProducerBrokerExchange();
- rc.connectionContext = connectionContext.copy();
- rc.producerState = producerState;
- rc.mutable = mutable;
- return rc;
- }
-
/**
* @return the connectionContext
*/
@@ -59,20 +41,6 @@ public class AMQProducerBrokerExchange {
}
/**
- * @return the mutable
- */
- public boolean isMutable() {
- return this.mutable;
- }
-
- /**
- * @param mutable the mutable to set
- */
- public void setMutable(boolean mutable) {
- this.mutable = mutable;
- }
-
- /**
* @return the producerState
*/
public ProducerState getProducerState() {
@@ -86,119 +54,6 @@ public class AMQProducerBrokerExchange {
this.producerState = producerState;
}
- /**
- * Enforce duplicate suppression using info from persistence adapter
- *
- * @return false if message should be ignored as a duplicate
- */
- public boolean canDispatch(Message messageSend) {
- boolean canDispatch = true;
- if (auditProducerSequenceIds && messageSend.isPersistent()) {
- final long producerSequenceId = messageSend.getMessageId().getProducerSequenceId();
- if (isNetworkProducer) {
- // messages are multiplexed on this producer so we need to query the
- // persistenceAdapter
- long lastStoredForMessageProducer = getStoredSequenceIdForMessage(messageSend.getMessageId());
- if (producerSequenceId <= lastStoredForMessageProducer) {
- canDispatch = false;
- }
- }
- else if (producerSequenceId <= lastSendSequenceNumber.get()) {
- canDispatch = false;
- if (messageSend.isInTransaction()) {
- }
- else {
- }
- }
- else {
- // track current so we can suppress duplicates later in the stream
- lastSendSequenceNumber.set(producerSequenceId);
- }
- }
- return canDispatch;
- }
-
- private long getStoredSequenceIdForMessage(MessageId messageId) {
- return -1;
- }
-
public void setLastStoredSequenceId(long l) {
}
-
- public void incrementSend() {
- flowControlInfo.incrementSend();
- }
-
- public void blockingOnFlowControl(boolean blockingOnFlowControl) {
- flowControlInfo.setBlockingOnFlowControl(blockingOnFlowControl);
- }
-
- public boolean isBlockedForFlowControl() {
- return flowControlInfo.isBlockingOnFlowControl();
- }
-
- public void resetFlowControl() {
- flowControlInfo.reset();
- }
-
- public long getTotalTimeBlocked() {
- return flowControlInfo.getTotalTimeBlocked();
- }
-
- public int getPercentageBlocked() {
- double value = flowControlInfo.getSendsBlocked() / flowControlInfo.getTotalSends();
- return (int) value * 100;
- }
-
- public static class FlowControlInfo {
-
- private AtomicBoolean blockingOnFlowControl = new AtomicBoolean();
- private AtomicLong totalSends = new AtomicLong();
- private AtomicLong sendsBlocked = new AtomicLong();
- private AtomicLong totalTimeBlocked = new AtomicLong();
-
- public boolean isBlockingOnFlowControl() {
- return blockingOnFlowControl.get();
- }
-
- public void setBlockingOnFlowControl(boolean blockingOnFlowControl) {
- this.blockingOnFlowControl.set(blockingOnFlowControl);
- if (blockingOnFlowControl) {
- incrementSendBlocked();
- }
- }
-
- public long getTotalSends() {
- return totalSends.get();
- }
-
- public void incrementSend() {
- this.totalSends.incrementAndGet();
- }
-
- public long getSendsBlocked() {
- return sendsBlocked.get();
- }
-
- public void incrementSendBlocked() {
- this.sendsBlocked.incrementAndGet();
- }
-
- public long getTotalTimeBlocked() {
- return totalTimeBlocked.get();
- }
-
- public void incrementTimeBlocked(long time) {
- this.totalTimeBlocked.addAndGet(time);
- }
-
- public void reset() {
- blockingOnFlowControl.set(false);
- totalSends.set(0);
- sendsBlocked.set(0);
- totalTimeBlocked.set(0);
-
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java
index 3e7afa5..2f9d0bc 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java
@@ -23,8 +23,6 @@ import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.core.server.HandleStatus;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
@@ -34,6 +32,18 @@ import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
public class AMQServerConsumer extends ServerConsumerImpl {
+ // TODO-NOW: remove this once unified
+ AMQConsumer amqConsumer;
+
+ public AMQConsumer getAmqConsumer() {
+ return amqConsumer;
+ }
+
+ /** TODO-NOW: remove this once unified */
+ public void setAmqConsumer(AMQConsumer amqConsumer) {
+ this.amqConsumer = amqConsumer;
+ }
+
public AMQServerConsumer(long consumerID,
AMQServerSession serverSession,
QueueBinding binding,
@@ -51,81 +61,6 @@ public class AMQServerConsumer extends ServerConsumerImpl {
super(consumerID, serverSession, binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits, server);
}
- public void setBrowserListener(BrowserListener listener) {
- AMQBrowserDeliverer newBrowserDeliverer = new AMQBrowserDeliverer(this.browserDeliverer);
- newBrowserDeliverer.listener = listener;
- this.browserDeliverer = newBrowserDeliverer;
- }
-
- private class AMQBrowserDeliverer extends BrowserDeliverer {
-
- private BrowserListener listener = null;
-
- public AMQBrowserDeliverer(final BrowserDeliverer other) {
- super(other.iterator);
- }
-
- @Override
- public synchronized void run() {
- // if the reference was busy during the previous iteration, handle it now
- if (current != null) {
- try {
- HandleStatus status = handle(current);
-
- if (status == HandleStatus.BUSY) {
- return;
- }
-
- if (status == HandleStatus.HANDLED) {
- proceedDeliver(current);
- }
-
- current = null;
- }
- catch (Exception e) {
- ActiveMQServerLogger.LOGGER.errorBrowserHandlingMessage(e, current);
- return;
- }
- }
-
- MessageReference ref = null;
- HandleStatus status;
-
- while (true) {
- try {
- ref = null;
- synchronized (messageQueue) {
- if (!iterator.hasNext()) {
- //here we need to send a null for amq browsers
- if (listener != null) {
- listener.browseFinished();
- }
- break;
- }
-
- ref = iterator.next();
-
- status = handle(ref);
- }
-
- if (status == HandleStatus.HANDLED) {
- proceedDeliver(ref);
- }
- else if (status == HandleStatus.BUSY) {
- // keep a reference on the current message reference
- // to handle it next time the browser deliverer is executed
- current = ref;
- break;
- }
- }
- catch (Exception e) {
- ActiveMQServerLogger.LOGGER.errorBrowserHandlingMessage(e, ref);
- break;
- }
- }
- }
- }
-
public void amqPutBackToDeliveringList(final List<MessageReference> refs) {
synchronized (this.deliveringRefs) {
for (MessageReference ref : refs) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java
index 0a3804c..3f0259d 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.openwire.amq;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -90,6 +91,12 @@ public class AMQServerSession extends ServerSessionImpl {
@Override
protected void doClose(final boolean failed) throws Exception {
+ Set<ServerConsumer> consumersClone = new HashSet<>(consumers.values());
+ for (ServerConsumer consumer : consumersClone) {
+ AMQServerConsumer amqConsumer = (AMQServerConsumer)consumer;
+ amqConsumer.setStarted(false);
+ }
+
synchronized (this) {
if (tx != null && tx.getXid() == null) {
((AMQTransactionImpl) tx).setRollbackForClose();
@@ -143,6 +150,8 @@ public class AMQServerSession extends ServerSessionImpl {
}
//amq specific behavior
+
+ // TODO: move this to AMQSession
public void amqRollback(Set<Long> acked) throws Exception {
if (tx == null) {
// Might be null if XA
@@ -218,7 +227,9 @@ public class AMQServerSession extends ServerSessionImpl {
final boolean supportLargeMessage,
final Integer credits) throws Exception {
if (this.internal) {
- //internal sessions doesn't check security
+ // Clebert TODO: PQP!!!!!!!!!!!!!!!!!!!!
+
+ //internal sessions doesn't check security:: Why??? //// what's the reason for that? Where a link?
Binding binding = postOffice.getBinding(queueName);
@@ -309,6 +320,8 @@ public class AMQServerSession extends ServerSessionImpl {
return queue;
}
+
+ // Clebert TODO: Get rid of these mthods
@Override
protected void doSend(final ServerMessage msg, final boolean direct) throws Exception {
if (!this.internal) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSessionFactory.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSessionFactory.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSessionFactory.java
index 9ce21e3..a6ca4a0 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSessionFactory.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSessionFactory.java
@@ -32,6 +32,15 @@ import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
public class AMQServerSessionFactory implements ServerSessionFactory {
+ private static final AMQServerSessionFactory singleInstance = new AMQServerSessionFactory();
+
+ public static AMQServerSessionFactory getInstance() {
+ return singleInstance;
+ }
+
+ private AMQServerSessionFactory() {
+ }
+
@Override
public ServerSessionImpl createCoreSession(String name,
String username,