You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/06/15 18:28:54 UTC
svn commit: r547730 [2/9] - in /incubator/qpid/trunk/qpid: ./ java/
java/broker/ java/broker/src/main/java/org/apache/qpid/server/
java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/
java/broker/src/main/java/org/apache/qpid/server/queue/ ...
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=547730&r1=547729&r2=547730
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Fri Jun 15 09:28:46 2007
@@ -20,47 +20,15 @@
*/
package org.apache.qpid.client;
-import java.io.Serializable;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Arrays;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.jms.BytesMessage;
-import javax.jms.Destination;
-import javax.jms.IllegalStateException;
-import javax.jms.InvalidDestinationException;
-import javax.jms.InvalidSelectorException;
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.ObjectMessage;
-import javax.jms.Queue;
-import javax.jms.QueueBrowser;
-import javax.jms.QueueReceiver;
-import javax.jms.QueueSender;
-import javax.jms.QueueSession;
-import javax.jms.StreamMessage;
-import javax.jms.TemporaryQueue;
-import javax.jms.TemporaryTopic;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import javax.jms.TopicPublisher;
-import javax.jms.TopicSession;
-import javax.jms.TopicSubscriber;
-
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQUndeliveredException;
-import org.apache.qpid.AMQInvalidRoutingKeyException;
import org.apache.qpid.AMQInvalidArgumentException;
-import org.apache.qpid.client.failover.FailoverSupport;
+import org.apache.qpid.AMQInvalidRoutingKeyException;
+import org.apache.qpid.AMQUndeliveredException;
+import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.failover.FailoverNoopSupport;
+import org.apache.qpid.client.failover.FailoverProtectedOperation;
+import org.apache.qpid.client.failover.FailoverRetrySupport;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.JMSBytesMessage;
import org.apache.qpid.client.message.JMSMapMessage;
@@ -70,21 +38,20 @@
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.client.protocol.BlockingMethodFrameListener;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.AccessRequestBody;
-import org.apache.qpid.framing.AccessRequestOkBody;
import org.apache.qpid.framing.BasicAckBody;
import org.apache.qpid.framing.BasicConsumeBody;
import org.apache.qpid.framing.BasicConsumeOkBody;
import org.apache.qpid.framing.BasicRecoverBody;
+import org.apache.qpid.framing.BasicRecoverOkBody;
+import org.apache.qpid.framing.BasicRejectBody;
import org.apache.qpid.framing.ChannelCloseBody;
import org.apache.qpid.framing.ChannelCloseOkBody;
import org.apache.qpid.framing.ChannelFlowBody;
+import org.apache.qpid.framing.ChannelFlowOkBody;
import org.apache.qpid.framing.ExchangeBoundBody;
import org.apache.qpid.framing.ExchangeBoundOkBody;
import org.apache.qpid.framing.ExchangeDeclareBody;
@@ -92,413 +59,618 @@
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.FieldTableFactory;
import org.apache.qpid.framing.QueueBindBody;
+import org.apache.qpid.framing.QueueBindOkBody;
import org.apache.qpid.framing.QueueDeclareBody;
+import org.apache.qpid.framing.QueueDeclareOkBody;
import org.apache.qpid.framing.QueueDeleteBody;
import org.apache.qpid.framing.QueueDeleteOkBody;
import org.apache.qpid.framing.TxCommitBody;
import org.apache.qpid.framing.TxCommitOkBody;
import org.apache.qpid.framing.TxRollbackBody;
import org.apache.qpid.framing.TxRollbackOkBody;
-import org.apache.qpid.framing.QueueBindOkBody;
-import org.apache.qpid.framing.QueueDeclareOkBody;
-import org.apache.qpid.framing.ChannelFlowOkBody;
-import org.apache.qpid.framing.BasicRecoverOkBody;
-import org.apache.qpid.framing.BasicRejectBody;
import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.URLSyntaxException;
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.IllegalStateException;
+import javax.jms.InvalidDestinationException;
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.QueueReceiver;
+import javax.jms.QueueSender;
+import javax.jms.QueueSession;
+import javax.jms.StreamMessage;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+import java.io.Serializable;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ */
public class AMQSession extends Closeable implements Session, QueueSession, TopicSession
{
+ /** Used for debugging. */
private static final Logger _logger = Logger.getLogger(AMQSession.class);
+ /** Used for debugging in the dispatcher. */
+ private static final Logger _dispatcherLogger = Logger.getLogger(Dispatcher.class);
+
+ /** The default maximum number of prefetched message at which to suspend the channel. */
public static final int DEFAULT_PREFETCH_HIGH_MARK = 5000;
+
+ /** The default minimum number of prefetched messages at which to resume the channel. */
public static final int DEFAULT_PREFETCH_LOW_MARK = 2500;
+ /**
+ * The default value for immediate flag used by producers created by this session is false. That is, a consumer does
+ * not need to be attached to a queue.
+ */
+ protected static final boolean DEFAULT_IMMEDIATE = false;
+
+ /**
+ * The default value for mandatory flag used by producers created by this session is true. That is, server will not
+ * silently drop messages where no queue is connected to the exchange for the message.
+ */
+ protected static final boolean DEFAULT_MANDATORY = true;
+
+ /** System property to enable strict AMQP compliance. */
+ public static final String STRICT_AMQP = "STRICT_AMQP";
+
+ /** Strict AMQP default setting. */
+ public static final String STRICT_AMQP_DEFAULT = "false";
+
+ /** System property to enable failure if strict AMQP compliance is violated. */
+ public static final String STRICT_AMQP_FATAL = "STRICT_AMQP_FATAL";
+
+ /** Strickt AMQP failure default. */
+ public static final String STRICT_AMQP_FATAL_DEFAULT = "true";
+
+ /** System property to enable immediate message prefetching. */
+ public static final String IMMEDIATE_PREFETCH = "IMMEDIATE_PREFETCH";
+
+ /** Immediate message prefetch default. */
+ public static final String IMMEDIATE_PREFETCH_DEFAULT = "false";
+
+ /** The connection to which this session belongs. */
private AMQConnection _connection;
+ /** Used to indicate whether or not this is a transactional session. */
private boolean _transacted;
+ /** Holds the sessions acknowledgement mode. */
private int _acknowledgeMode;
+ /** Holds this session unique identifier, used to distinguish it from other sessions. */
private int _channelId;
+ /** @todo This does not appear to be set? */
private int _ticket;
+ /** Holds the high mark for prefetched message, at which the session is suspended. */
private int _defaultPrefetchHighMark = DEFAULT_PREFETCH_HIGH_MARK;
+
+ /** Holds the low mark for prefetched messages, below which the session is resumed. */
private int _defaultPrefetchLowMark = DEFAULT_PREFETCH_LOW_MARK;
+ /** Holds the message listener, if any, which is attached to this session. */
private MessageListener _messageListener = null;
+ /** Used to indicate that this session has been started at least once. */
private AtomicBoolean _startedAtLeastOnce = new AtomicBoolean(false);
/**
- * Used to reference durable subscribers so they requests for unsubscribe can be handled correctly. Note this only
- * keeps a record of subscriptions which have been created in the current instance. It does not remember
- * subscriptions between executions of the client
+ * Used to reference durable subscribers so that requests for unsubscribe can be handled correctly. Note this only
+ * keeps a record of subscriptions which have been created in the current instance. It does not remember
+ * subscriptions between executions of the client.
*/
private final ConcurrentHashMap<String, TopicSubscriberAdaptor> _subscriptions =
new ConcurrentHashMap<String, TopicSubscriberAdaptor>();
+
+ /**
+ * Holds a mapping from message consumers to their identifying names, so that their subscriptions may be looked up
+ * in the {@link #_subscriptions} map.
+ */
private final ConcurrentHashMap<BasicMessageConsumer, String> _reverseSubscriptionMap =
new ConcurrentHashMap<BasicMessageConsumer, String>();
- /** Used in the consume method. We generate the consume tag on the client so that we can use the nowait feature. */
- private int _nextTag = 1;
-
- /** This queue is bounded and is used to store messages before being dispatched to the consumer */
+ /**
+ * Used to hold incoming messages.
+ *
+ * @todo Weaken the type once {@link FlowControllingBlockingQueue} implements Queue.
+ */
private final FlowControllingBlockingQueue _queue;
+ /** Holds the dispatcher thread for this session. */
private Dispatcher _dispatcher;
+ /** Holds the message factory factory for this session. */
private MessageFactoryRegistry _messageFactoryRegistry;
- /** Set of all producers created by this session */
- private Map _producers = new ConcurrentHashMap();
-
- /** Maps from consumer tag (String) to JMSMessageConsumer instance */
- private Map<AMQShortString, BasicMessageConsumer> _consumers = new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>();
-
- /** Maps from destination to count of JMSMessageConsumers */
- private ConcurrentHashMap<Destination, AtomicInteger> _destinationConsumerCount =
- new ConcurrentHashMap<Destination, AtomicInteger>();
+ /** Holds all of the producers created by this session, keyed by their unique identifiers. */
+ private Map<Long, MessageProducer> _producers = new ConcurrentHashMap<Long, MessageProducer>();
/**
- * Default value for immediate flag used by producers created by this session is false, i.e. a consumer does not
- * need to be attached to a queue
+ * Used as a source of unique identifiers so that the consumers can be tagged to match them to BasicConsume
+ * methods.
*/
- protected static final boolean DEFAULT_IMMEDIATE = false;
+ private int _nextTag = 1;
/**
- * Default value for mandatory flag used by producers created by this sessio is true, i.e. server will not silently
- * drop messages where no queue is connected to the exchange for the message
+ * Maps from identifying tags to message consumers, in order to pass dispatch incoming messages to the right
+ * consumer.
*/
- protected static final boolean DEFAULT_MANDATORY = true;
+ private Map<AMQShortString, BasicMessageConsumer> _consumers =
+ new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>();
+
+ /** Provides a count of consumers on destinations, in order to be able to know if a destination has consumers. */
+ private ConcurrentHashMap<Destination, AtomicInteger> _destinationConsumerCount =
+ new ConcurrentHashMap<Destination, AtomicInteger>();
/**
- * The counter of the next producer id. This id is generated by the session and used only to allow the producer to
- * identify itself to the session when deregistering itself. <p/> Access to this id does not require to be
- * synchronized since according to the JMS specification only one thread of control is allowed to create producers
- * for any given session instance.
+ * Used as a source of unique identifiers for producers within the session.
+ *
+ * <p/> Access to this id does not require to be synchronized since according to the JMS specification only one
+ * thread of control is allowed to create producers for any given session instance.
*/
private long _nextProducerId;
-
/**
* Set when recover is called. This is to handle the case where recover() is called by application code during
- * onMessage() processing. We need to make sure we do not send an auto ack if recover was called.
+ * onMessage() processing to enure that an auto ack is not sent.
*/
private boolean _inRecovery;
+ /** Used to indicates that the connection to which this session belongs, has been stopped. */
private boolean _connectionStopped;
+ /** Used to indicate that this session has a message listener attached to it. */
private boolean _hasMessageListeners;
+ /** Used to indicate that this session has been suspended. */
private boolean _suspended;
+ /**
+ * Used to protect the suspension of this session, so that critical code can be executed during suspension, without
+ * the session being resumed by other threads.
+ */
private final Object _suspensionLock = new Object();
- /** Boolean to control immediate prefetch . Records the first call to the dispatcher to prevent further flow(true) */
+ /**
+ * Used to ensure that onlt the first call to start the dispatcher can unsuspend the channel.
+ *
+ * @todo This is accessed only within a synchronized method, so does not need to be atomic.
+ */
private final AtomicBoolean _firstDispatcher = new AtomicBoolean(true);
- /** System property to enable strickt AMQP compliance */
- public static final String STRICT_AMQP = "STRICT_AMQP";
- /** Strickt AMQP default */
- public static final String STRICT_AMQP_DEFAULT = "false";
+ /** Used to indicate that the session should start pre-fetching messages as soon as it is started. */
+ private final boolean _immediatePrefetch;
+ /** Indicates that warnings should be generated on violations of the strict AMQP. */
private final boolean _strictAMQP;
- /** System property to enable strickt AMQP compliance */
- public static final String STRICT_AMQP_FATAL = "STRICT_AMQP_FATAL";
- /** Strickt AMQP default */
- public static final String STRICT_AMQP_FATAL_DEFAULT = "true";
-
+ /** Indicates that runtime exceptions should be generated on vilations of the strict AMQP. */
private final boolean _strictAMQPFATAL;
- /** System property to enable immediate message prefetching */
- public static final String IMMEDIATE_PREFETCH = "IMMEDIATE_PREFETCH";
- /** Immediate message prefetch default */
- public static final String IMMEDIATE_PREFETCH_DEFAULT = "false";
+ /**
+ * Creates a new session on a connection.
+ *
+ * @param con The connection on which to create the session.
+ * @param channelId The unique identifier for the session.
+ * @param transacted Indicates whether or not the session is transactional.
+ * @param acknowledgeMode The acknoledgement mode for the session.
+ * @param messageFactoryRegistry The message factory factory for the session.
+ * @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session.
+ * @param defaultPrefetchLowMark The number of prefetched messages at which to resume the session.
+ */
+ AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
+ MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark)
+ {
- private final boolean _immediatePrefetch;
+ _strictAMQP = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP, STRICT_AMQP_DEFAULT));
+ _strictAMQPFATAL =
+ Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP_FATAL, STRICT_AMQP_FATAL_DEFAULT));
+ _immediatePrefetch =
+ _strictAMQP
+ || Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT));
- private static final Logger _dispatcherLogger = Logger.getLogger(Dispatcher.class);
+ _connection = con;
+ _transacted = transacted;
+ if (transacted)
+ {
+ _acknowledgeMode = javax.jms.Session.SESSION_TRANSACTED;
+ }
+ else
+ {
+ _acknowledgeMode = acknowledgeMode;
+ }
- /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
- private class Dispatcher extends Thread
- {
+ _channelId = channelId;
+ _messageFactoryRegistry = messageFactoryRegistry;
+ _defaultPrefetchHighMark = defaultPrefetchHighMark;
+ _defaultPrefetchLowMark = defaultPrefetchLowMark;
- /** Track the 'stopped' state of the dispatcher, a session starts in the stopped state. */
- private final AtomicBoolean _closed = new AtomicBoolean(false);
+ if (_acknowledgeMode == NO_ACKNOWLEDGE)
+ {
+ _queue =
+ new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark,
+ new FlowControllingBlockingQueue.ThresholdListener()
+ {
+ public void aboveThreshold(int currentValue)
+ {
+ if (_acknowledgeMode == NO_ACKNOWLEDGE)
+ {
+ _logger.debug(
+ "Above threshold(" + _defaultPrefetchHighMark
+ + ") so suspending channel. Current value is " + currentValue);
+ new Thread(new SuspenderRunner(true)).start();
+ }
+ }
+
+ public void underThreshold(int currentValue)
+ {
+ if (_acknowledgeMode == NO_ACKNOWLEDGE)
+ {
+ _logger.debug(
+ "Below threshold(" + _defaultPrefetchLowMark
+ + ") so unsuspending channel. Current value is " + currentValue);
+ new Thread(new SuspenderRunner(false)).start();
+ }
+ }
+ });
+ }
+ else
+ {
+ _queue = new FlowControllingBlockingQueue(_defaultPrefetchHighMark, null);
+ }
+ }
- private final Object _lock = new Object();
+ /**
+ * Creates a new session on a connection with the default message factory factory.
+ *
+ * @param con The connection on which to create the session.
+ * @param channelId The unique identifier for the session.
+ * @param transacted Indicates whether or not the session is transactional.
+ * @param acknowledgeMode The acknoledgement mode for the session.
+ * @param defaultPrefetchHigh The maximum number of messages to prefetched before suspending the session.
+ * @param defaultPrefetchLow The number of prefetched messages at which to resume the session.
+ */
+ AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh,
+ int defaultPrefetchLow)
+ {
+ this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh,
+ defaultPrefetchLow);
+ }
- public Dispatcher()
+ /**
+ * Acknowledges all unacknowledged messages on the session, for all message consumers on the session.
+ *
+ * @throws IllegalStateException If the session is closed.
+ */
+ public void acknowledge() throws IllegalStateException
+ {
+ if (isClosed())
{
- super("Dispatcher-Channel-" + _channelId);
- if (_dispatcherLogger.isInfoEnabled())
- {
- _dispatcherLogger.info(getName() + " created");
- }
+ throw new IllegalStateException("Session is already closed");
}
- public void run()
+ for (BasicMessageConsumer consumer : _consumers.values())
{
- if (_dispatcherLogger.isInfoEnabled())
- {
- _dispatcherLogger.info(getName() + " started");
- }
+ consumer.acknowledge();
+ }
+ }
- UnprocessedMessage message;
+ /**
+ * Acknowledge one or many messages.
+ *
+ * @param deliveryTag The tag of the last message to be acknowledged.
+ * @param multiple <tt>true</tt> to acknowledge all messages up to and including the one specified by the
+ * delivery tag, <tt>false</tt> to just acknowledge that message.
+ *
+ * @todo Be aware of possible changes to parameter order as versions change.
+ */
+ public void acknowledgeMessage(long deliveryTag, boolean multiple)
+ {
+ final AMQFrame ackFrame =
+ BasicAckBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), deliveryTag,
+ multiple);
- // Allow disptacher to start stopped
- synchronized (_lock)
- {
- while (connectionStopped())
- {
- try
- {
- _lock.wait();
- }
- catch (InterruptedException e)
- {
- // ignore
- }
- }
- }
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId);
+ }
- try
- {
- while (!_closed.get() && (message = (UnprocessedMessage) _queue.take()) != null)
- {
- synchronized (_lock)
- {
+ getProtocolHandler().writeFrame(ackFrame);
+ }
- while (connectionStopped())
- {
- _lock.wait();
- }
+ /**
+ * Binds the named queue, with the specified routing key, to the named exchange.
+ *
+ * <p/>Note that this operation automatically retries in the event of fail-over.
+ *
+ * @param queueName The name of the queue to bind.
+ * @param routingKey The routing key to bind the queue with.
+ * @param arguments Additional arguments.
+ * @param exchangeName The exchange to bind the queue on.
+ *
+ * @throws AMQException If the queue cannot be bound for any reason.
+ * @todo Be aware of possible changes to parameter order as versions change.
+ * @todo Document the additional arguments that may be passed in the field table. Are these for headers exchanges?
+ */
+ public void bindQueue(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
+ final AMQShortString exchangeName) throws AMQException
+ {
+ /*new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()*/
+ new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
+ {
+ public Object execute() throws AMQException, FailoverException
+ {
+ AMQFrame queueBind =
+ QueueBindBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(),
+ arguments, // arguments
+ exchangeName, // exchange
+ false, // nowait
+ queueName, // queue
+ routingKey, // routingKey
+ getTicket()); // ticket
- dispatchMessage(message);
+ getProtocolHandler().syncWrite(queueBind, QueueBindOkBody.class);
- while (connectionStopped())
- {
- _lock.wait();
- }
+ return null;
+ }
+ }, _connection).execute();
+ }
- }
-
- }
- }
- catch (InterruptedException e)
- {
- //ignore
- }
- if (_dispatcherLogger.isInfoEnabled())
- {
- _dispatcherLogger.info(getName() + " thread terminating for channel " + _channelId);
- }
- }
+ /**
+ * Closes the session with no timeout.
+ *
+ * @throws JMSException If the JMS provider fails to close the session due to some internal error.
+ */
+ public void close() throws JMSException
+ {
+ close(-1);
+ }
- // only call while holding lock
- final boolean connectionStopped()
+ /**
+ * Closes the session.
+ *
+ * <p/>Note that this operation succeeds automatically if a fail-over interupts the sycnronous request to close the
+ * channel. This is because the channel is marked as closed before the request to close it is made, so the fail-over
+ * should not re-open it.
+ *
+ * @param timeout The timeout in milliseconds to wait for the session close acknoledgement from the broker.
+ *
+ * @throws JMSException If the JMS provider fails to close the session due to some internal error.
+ * @todo Be aware of possible changes to parameter order as versions change.
+ * @todo Not certain about the logic of ignoring the failover exception, because the channel won't be re-opened. May
+ * need to examine this more carefully.
+ * @todo Note that taking the failover mutex doesn't prevent this operation being interrupted by a failover, because
+ * the failover process sends the failover event before acquiring the mutex itself.
+ */
+ public void close(long timeout) throws JMSException
+ {
+ if (_logger.isInfoEnabled())
{
- return _connectionStopped;
+ _logger.info("Closing session: " + this + ":"
+ + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
}
- boolean setConnectionStopped(boolean connectionStopped)
+ // We must close down all producers and consumers in an orderly fashion. This is the only method
+ // that can be called from a different thread of control from the one controlling the session.
+ synchronized (_connection.getFailoverMutex())
{
- boolean currently;
- synchronized (_lock)
+ // Ensure we only try and close an open session.
+ if (!_closed.getAndSet(true))
{
- currently = _connectionStopped;
- _connectionStopped = connectionStopped;
- _lock.notify();
+ // we pass null since this is not an error case
+ closeProducersAndConsumers(null);
- if (_dispatcherLogger.isDebugEnabled())
+ try
{
- _dispatcherLogger.debug("Set Dispatcher Connection " + (connectionStopped ? "Stopped" : "Started") +
- ": Currently " + (currently ? "Stopped" : "Started"));
- }
- }
- return currently;
- }
- private void dispatchMessage(UnprocessedMessage message)
- {
- if (message.getDeliverBody() != null)
- {
- final BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(message.getDeliverBody().consumerTag);
+ getProtocolHandler().closeSession(this);
+
+ final AMQFrame frame =
+ ChannelCloseBody.createAMQFrame(getChannelId(), getProtocolMajorVersion(), getProtocolMinorVersion(),
+ 0, // classId
+ 0, // methodId
+ AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
+ new AMQShortString("JMS client closing channel")); // replyText
+
+ getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class, timeout);
- if (consumer == null || consumer.isClosed())
+ // When control resumes at this point, a reply will have been received that
+ // indicates the broker has closed the channel successfully.
+ }
+ catch (AMQException e)
{
- if (_dispatcherLogger.isInfoEnabled())
- {
- if (consumer == null)
- {
- _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" +
- "[" + message.getDeliverBody().deliveryTag + "] from queue " +
- message.getDeliverBody().consumerTag +
- " )without a handler - rejecting(requeue)...");
- }
- else
- {
- _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" +
- "[" + message.getDeliverBody().deliveryTag + "] from queue " +
- " consumer(" + consumer.debugIdentity() +
- ") is closed rejecting(requeue)...");
- }
- }
- // Don't reject if we're already closing
- if (!_closed.get())
- {
- rejectMessage(message, true);
- }
+ JMSException jmse = new JMSException("Error closing session: " + e);
+ jmse.setLinkedException(e);
+ throw jmse;
}
- else
+ // This is ignored because the channel is already marked as closed so the fail-over process will
+ // not re-open it.
+ catch (FailoverException e)
{
- consumer.notifyMessage(message, _channelId);
+ _logger.debug(
+ "Got FailoverException during channel close, ignored as channel already marked as closed.");
+ }
+ finally
+ {
+ _connection.deregisterSession(_channelId);
}
}
}
+ }
- public void close()
+ /**
+ * Called when the server initiates the closure of the session unilaterally.
+ *
+ * @param e the exception that caused this session to be closed. Null causes the
+ */
+ public void closed(Throwable e) throws JMSException
+ {
+ synchronized (_connection.getFailoverMutex())
{
+ // An AMQException has an error code and message already and will be passed in when closure occurs as a
+ // result of a channel close request
_closed.set(true);
- interrupt();
-
- //fixme awaitTermination
+ AMQException amqe;
+ if (e instanceof AMQException)
+ {
+ amqe = (AMQException) e;
+ }
+ else
+ {
+ amqe = new AMQException(null, "Closing session forcibly", e);
+ }
+ _connection.deregisterSession(_channelId);
+ closeProducersAndConsumers(amqe);
}
+ }
- public void rollback()
- {
+ /**
+ * Commits all messages done in this transaction and releases any locks currently held.
+ *
+ * <p/>If the commit fails, because the commit itself is interrupted by a fail-over between requesting that the
+ * commit be done, and receiving an acknowledgement that it has been done, then a JMSException will be thrown. The
+ * client will be unable to determine whether or not the commit actually happened on the broker in this case.
+ *
+ * @throws JMSException If the JMS provider fails to commit the transaction due to some internal error. This does
+ * not mean that the commit is known to have failed, merely that it is not known whether it
+ * failed or not.
+ * @todo Be aware of possible changes to parameter order as versions change.
+ */
+ public void commit() throws JMSException
+ {
+ checkTransacted();
- synchronized (_lock)
+ try
+ {
+ // Acknowledge up to message last delivered (if any) for each consumer.
+ // need to send ack for messages delivered to consumers so far
+ for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
{
- boolean isStopped = connectionStopped();
-
- if (!isStopped)
- {
- setConnectionStopped(true);
- }
-
- rejectAllMessages(true);
-
- _dispatcherLogger.debug("Session Pre Dispatch Queue cleared");
-
- for (BasicMessageConsumer consumer : _consumers.values())
- {
- if (!consumer.isNoConsume())
- {
- consumer.rollback();
- }
- else
- {
- // should perhaps clear the _SQ here.
- //consumer._synchronousQueue.clear();
- consumer.clearReceiveQueue();
- }
-
-
- }
-
- setConnectionStopped(isStopped);
+ // Sends acknowledgement to server
+ i.next().acknowledgeLastDelivered();
}
+ // Commits outstanding messages sent and outstanding acknowledgements.
+ final AMQProtocolHandler handler = getProtocolHandler();
+
+ handler.syncWrite(TxCommitBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()),
+ TxCommitOkBody.class);
+ }
+ catch (AMQException e)
+ {
+ throw new JMSAMQException("Failed to commit: " + e.getMessage(), e);
}
+ catch (FailoverException e)
+ {
+ throw new JMSAMQException("Fail-over interrupted commit. Status of the commit is uncertain.", e);
+ }
+ }
- public void rejectPending(BasicMessageConsumer consumer)
+ public void confirmConsumerCancelled(AMQShortString consumerTag)
+ {
+
+ // Remove the consumer from the map
+ BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(consumerTag);
+ if (consumer != null)
{
- synchronized (_lock)
+ // fixme this isn't right.. needs to check if _queue contains data for this consumer
+ if (consumer.isAutoClose()) // && _queue.isEmpty())
{
- boolean stopped = _dispatcher.connectionStopped();
+ consumer.closeWhenNoMessages(true);
+ }
- if (!stopped)
+ if (!consumer.isNoConsume())
+ {
+ // Clean the Maps up first
+ // Flush any pending messages for this consumerTag
+ if (_dispatcher != null)
{
- _dispatcher.setConnectionStopped(true);
+ _logger.info("Dispatcher is not null");
}
+ else
+ {
+ _logger.info("Dispatcher is null so created stopped dispatcher");
- // Reject messages on pre-receive queue
- consumer.rollback();
-
- // Reject messages on pre-dispatch queue
- rejectMessagesForConsumerTag(consumer.getConsumerTag(), true);
-
- // closeConsumer
- consumer.markClosed();
-
- _dispatcher.setConnectionStopped(stopped);
+ startDistpatcherIfNecessary(true);
+ }
+ _dispatcher.rejectPending(consumer);
}
- }
- }
-
-
-
- AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
- MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark)
- {
-
- _strictAMQP = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP, STRICT_AMQP_DEFAULT));
- _strictAMQPFATAL = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP_FATAL, STRICT_AMQP_FATAL_DEFAULT));
- _immediatePrefetch = _strictAMQP || Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT));
+ else
+ {
+ // Just close the consumer
+ // fixme the CancelOK is being processed before the arriving messages..
+ // The dispatcher is still to process them so the server sent in order but the client
+ // has yet to receive before the close comes in.
- _connection = con;
- _transacted = transacted;
- if (transacted)
- {
- _acknowledgeMode = javax.jms.Session.SESSION_TRANSACTED;
+ // consumer.markClosed();
+ }
}
else
{
- _acknowledgeMode = acknowledgeMode;
+ _logger.warn("Unable to confirm cancellation of consumer (" + consumerTag + "). Not found in consumer map.");
}
- _channelId = channelId;
- _messageFactoryRegistry = messageFactoryRegistry;
- _defaultPrefetchHighMark = defaultPrefetchHighMark;
- _defaultPrefetchLowMark = defaultPrefetchLowMark;
- if (_acknowledgeMode == NO_ACKNOWLEDGE)
+ }
+
+ public QueueBrowser createBrowser(Queue queue) throws JMSException
+ {
+ if (isStrictAMQP())
{
- _queue = new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark,
- new FlowControllingBlockingQueue.ThresholdListener()
- {
- public void aboveThreshold(int currentValue)
- {
- if (_acknowledgeMode == NO_ACKNOWLEDGE)
- {
- _logger.warn("Above threshold(" + _defaultPrefetchHighMark + ") so suspending channel. Current value is " + currentValue);
- new Thread(new SuspenderRunner(true)).start();
- }
- }
-
- public void underThreshold(int currentValue)
- {
- if (_acknowledgeMode == NO_ACKNOWLEDGE)
- {
- _logger.warn("Below threshold(" + _defaultPrefetchLowMark + ") so unsuspending channel. Current value is " + currentValue);
- new Thread(new SuspenderRunner(false)).start();
- }
- }
- });
+ throw new UnsupportedOperationException();
}
- else
+
+ return createBrowser(queue, null);
+ }
+
+ public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException
+ {
+ if (isStrictAMQP())
{
- _queue = new FlowControllingBlockingQueue(_defaultPrefetchHighMark, null);
+ throw new UnsupportedOperationException();
}
- }
+ checkNotClosed();
+ checkValidQueue(queue);
- AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow)
- {
- this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, defaultPrefetchLow);
+ return new AMQQueueBrowser(this, (AMQQueue) queue, messageSelector);
}
- public AMQConnection getAMQConnection()
+ public MessageConsumer createBrowserConsumer(Destination destination, String messageSelector, boolean noLocal)
+ throws JMSException
{
- return _connection;
+ checkValidDestination(destination);
+
+ return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, false,
+ messageSelector, null, true, true);
}
public BytesMessage createBytesMessage() throws JMSException
@@ -506,1506 +678,1548 @@
synchronized (_connection.getFailoverMutex())
{
checkNotClosed();
+
return new JMSBytesMessage();
}
}
- public MapMessage createMapMessage() throws JMSException
+ public MessageConsumer createConsumer(Destination destination) throws JMSException
{
- synchronized (_connection.getFailoverMutex())
- {
- checkNotClosed();
- return new JMSMapMessage();
- }
+ checkValidDestination(destination);
+
+ return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, null, null,
+ false, false);
}
- public javax.jms.Message createMessage() throws JMSException
+ public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
{
- return createBytesMessage();
+ checkValidDestination(destination);
+
+ return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false,
+ messageSelector, null, false, false);
}
- public ObjectMessage createObjectMessage() throws JMSException
+ public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal)
+ throws JMSException
{
- synchronized (_connection.getFailoverMutex())
- {
- checkNotClosed();
- return (ObjectMessage) new JMSObjectMessage();
- }
- }
+ checkValidDestination(destination);
- public ObjectMessage createObjectMessage(Serializable object) throws JMSException
- {
- ObjectMessage msg = createObjectMessage();
- msg.setObject(object);
- return msg;
+ return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, false,
+ messageSelector, null, false, false);
}
- public StreamMessage createStreamMessage() throws JMSException
+ public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive,
+ String selector) throws JMSException
{
- synchronized (_connection.getFailoverMutex())
- {
- checkNotClosed();
+ checkValidDestination(destination);
- return new JMSStreamMessage();
- }
+ return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, null, false, false);
}
- public TextMessage createTextMessage() throws JMSException
+ public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal,
+ boolean exclusive, String selector) throws JMSException
{
- synchronized (_connection.getFailoverMutex())
- {
- checkNotClosed();
+ checkValidDestination(destination);
- return new JMSTextMessage();
- }
+ return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null, false, false);
}
- public TextMessage createTextMessage(String text) throws JMSException
+ public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive,
+ String selector, FieldTable rawSelector) throws JMSException
{
+ checkValidDestination(destination);
- TextMessage msg = createTextMessage();
- msg.setText(text);
- return msg;
+ return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, rawSelector, false, false);
}
- public boolean getTransacted() throws JMSException
+ public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal,
+ boolean exclusive, String selector, FieldTable rawSelector) throws JMSException
{
- checkNotClosed();
- return _transacted;
- }
+ checkValidDestination(destination);
- public int getAcknowledgeMode() throws JMSException
- {
- checkNotClosed();
- return _acknowledgeMode;
+ return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, rawSelector, false,
+ false);
}
- public void commit() throws JMSException
+ public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
{
- checkTransacted();
- try
+
+ checkNotClosed();
+ AMQTopic origTopic = checkValidTopic(topic);
+ AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection);
+ TopicSubscriberAdaptor subscriber = _subscriptions.get(name);
+ if (subscriber != null)
{
- // Acknowledge up to message last delivered (if any) for each consumer.
- //need to send ack for messages delivered to consumers so far
- for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
+ if (subscriber.getTopic().equals(topic))
{
- //Sends acknowledgement to server
- i.next().acknowledgeLastDelivered();
+ throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription exchange "
+ + name);
+ }
+ else
+ {
+ unsubscribe(name);
}
-
- // Commits outstanding messages sent and outstanding acknowledgements.
- // TODO: Be aware of possible changes to parameter order as versions change.
- final AMQProtocolHandler handler = getProtocolHandler();
-
- handler.syncWrite(TxCommitBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(),
- getProtocolMinorVersion()),
- TxCommitOkBody.class);
- }
- catch (AMQException e)
- {
- JMSException exception = new JMSException("Failed to commit: " + e.getMessage());
- exception.setLinkedException(e);
- throw exception;
}
- }
-
-
- public void rollback() throws JMSException
- {
- synchronized (_suspensionLock)
+ else
{
- checkTransacted();
- try
+ AMQShortString topicName;
+ if (topic instanceof AMQTopic)
{
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
-
- boolean isSuspended = isSuspended();
+ topicName = ((AMQTopic) topic).getDestinationName();
+ }
+ else
+ {
+ topicName = new AMQShortString(topic.getTopicName());
+ }
- if (!isSuspended)
+ if (_strictAMQP)
+ {
+ if (_strictAMQPFATAL)
{
- suspendChannel(true);
+ throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP.");
}
-
- if (_dispatcher != null)
+ else
{
- _dispatcher.rollback();
+ _logger.warn("Unable to determine if subscription already exists for '" + topicName + "' "
+ + "for creation durableSubscriber. Requesting queue deletion regardless.");
}
- _connection.getProtocolHandler().syncWrite(
- TxRollbackBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class);
-
-
- if (!isSuspended)
- {
- suspendChannel(false);
- }
+ deleteQueue(dest.getAMQQueueName());
}
- catch (AMQException e)
+ else
{
- throw (JMSException) (new JMSException("Failed to rollback: " + e).initCause(e));
+ // if the queue is bound to the exchange but NOT for this topic, then the JMS spec
+ // says we must trash the subscription.
+ if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName())
+ && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName))
+ {
+ deleteQueue(dest.getAMQQueueName());
+ }
}
}
+
+ subscriber = new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
+
+ _subscriptions.put(name, subscriber);
+ _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
+
+ return subscriber;
}
- public void close() throws JMSException
+ /** Note, currently this does not handle reuse of the same name with different topics correctly. */
+ public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal)
+ throws JMSException
{
- close(-1);
+ checkNotClosed();
+ checkValidTopic(topic);
+ AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection);
+ BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal);
+ TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer);
+ _subscriptions.put(name, subscriber);
+ _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
+
+ return subscriber;
}
- public void close(long timeout) throws JMSException
+ public MapMessage createMapMessage() throws JMSException
{
- if (_logger.isInfoEnabled())
+ synchronized (_connection.getFailoverMutex())
{
- _logger.info("Closing session: " + this + ":" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
+ checkNotClosed();
+
+ return new JMSMapMessage();
}
+ }
- // We must close down all producers and consumers in an orderly fashion. This is the only method
- // that can be called from a different thread of control from the one controlling the session
+ public javax.jms.Message createMessage() throws JMSException
+ {
+ return createBytesMessage();
+ }
+
+ public ObjectMessage createObjectMessage() throws JMSException
+ {
synchronized (_connection.getFailoverMutex())
{
- //Ensure we only try and close an open session.
- if (!_closed.getAndSet(true))
- {
- // we pass null since this is not an error case
- closeProducersAndConsumers(null);
-
- try
- {
+ checkNotClosed();
- getProtocolHandler().closeSession(this);
- // TODO: Be aware of possible changes to parameter order as versions change.
- final AMQFrame frame = ChannelCloseBody.createAMQFrame(getChannelId(),
- getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
- 0, // classId
- 0, // methodId
- AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
- new AMQShortString("JMS client closing channel")); // replyText
+ return (ObjectMessage) new JMSObjectMessage();
+ }
+ }
- getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class, timeout);
- // When control resumes at this point, a reply will have been received that
- // indicates the broker has closed the channel successfully
+ public ObjectMessage createObjectMessage(Serializable object) throws JMSException
+ {
+ ObjectMessage msg = createObjectMessage();
+ msg.setObject(object);
- }
- catch (AMQException e)
- {
- JMSException jmse = new JMSException("Error closing session: " + e);
- jmse.setLinkedException(e);
- throw jmse;
- }
- finally
- {
- _connection.deregisterSession(_channelId);
- }
- }
- }
+ return msg;
}
- private AMQProtocolHandler getProtocolHandler()
+ public BasicMessageProducer createProducer(Destination destination) throws JMSException
{
- return _connection.getProtocolHandler();
+ return createProducerImpl(destination, DEFAULT_MANDATORY, DEFAULT_IMMEDIATE);
}
+ public BasicMessageProducer createProducer(Destination destination, boolean immediate) throws JMSException
+ {
+ return createProducerImpl(destination, DEFAULT_MANDATORY, immediate);
+ }
- private byte getProtocolMinorVersion()
+ public BasicMessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate)
+ throws JMSException
{
- return getProtocolHandler().getProtocolMinorVersion();
+ return createProducerImpl(destination, mandatory, immediate);
}
- private byte getProtocolMajorVersion()
+ public BasicMessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate,
+ boolean waitUntilSent) throws JMSException
{
- return getProtocolHandler().getProtocolMajorVersion();
+ return createProducerImpl(destination, mandatory, immediate, waitUntilSent);
}
+ public TopicPublisher createPublisher(Topic topic) throws JMSException
+ {
+ checkNotClosed();
- /**
- * Close all producers or consumers. This is called either in the error case or when closing the session normally.
- *
- * @param amqe the exception, may be null to indicate no error has occurred
- */
- private void closeProducersAndConsumers(AMQException amqe) throws JMSException
+ return new TopicPublisherAdapter((BasicMessageProducer) createProducer(topic), topic);
+ }
+
+ public Queue createQueue(String queueName) throws JMSException
{
- JMSException jmse = null;
- try
- {
- closeProducers();
- }
- catch (JMSException e)
- {
- _logger.error("Error closing session: " + e, e);
- jmse = e;
- }
- try
+ checkNotClosed();
+ if (queueName.indexOf('/') == -1)
{
- closeConsumers(amqe);
+ return new AMQQueue(getDefaultQueueExchangeName(), new AMQShortString(queueName));
}
- catch (JMSException e)
+ else
{
- _logger.error("Error closing session: " + e, e);
- if (jmse == null)
+ try
{
- jmse = e;
+ return new AMQQueue(new AMQBindingURL(queueName));
}
- }
- if (jmse != null)
- {
- throw jmse;
- }
- }
-
+ catch (URLSyntaxException urlse)
+ {
+ JMSException jmse = new JMSException(urlse.getReason());
+ jmse.setLinkedException(urlse);
- public boolean isSuspended()
- {
- return _suspended;
+ throw jmse;
+ }
+ }
}
-
/**
- * Called when the server initiates the closure of the session unilaterally.
+ * Declares the named queue.
*
- * @param e the exception that caused this session to be closed. Null causes the
+ * <p/>Note that this operation automatically retries in the event of fail-over.
+ *
+ * @param name The name of the queue to declare.
+ * @param autoDelete
+ * @param durable Flag to indicate that the queue is durable.
+ * @param exclusive Flag to indicate that the queue is exclusive to this client.
+ *
+ * @throws AMQException If the queue cannot be declared for any reason.
+ * @todo Be aware of possible changes to parameter order as versions change.
*/
- public void closed(Throwable e) throws JMSException
+ public void createQueue(final AMQShortString name, final boolean autoDelete, final boolean durable,
+ final boolean exclusive) throws AMQException
{
- synchronized (_connection.getFailoverMutex())
+ new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
{
- // An AMQException has an error code and message already and will be passed in when closure occurs as a
- // result of a channel close request
- _closed.set(true);
- AMQException amqe;
- if (e instanceof AMQException)
- {
- amqe = (AMQException) e;
- }
- else
+ public Object execute() throws AMQException, FailoverException
{
- amqe = new AMQException(null, "Closing session forcibly", e);
+ AMQFrame queueDeclare =
+ QueueDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(),
+ null, // arguments
+ autoDelete, // autoDelete
+ durable, // durable
+ exclusive, // exclusive
+ false, // nowait
+ false, // passive
+ name, // queue
+ getTicket()); // ticket
+
+ getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class);
+
+ return null;
}
- _connection.deregisterSession(_channelId);
- closeProducersAndConsumers(amqe);
- }
+ }, _connection).execute();
}
/**
- * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after failover
- * when the client has veoted resubscription. <p/> The caller of this method must already hold the failover mutex.
+ * Creates a QueueReceiver
+ *
+ * @param destination
+ *
+ * @return QueueReceiver - a wrapper around our MessageConsumer
+ *
+ * @throws JMSException
*/
- void markClosed()
+ public QueueReceiver createQueueReceiver(Destination destination) throws JMSException
{
- _closed.set(true);
- _connection.deregisterSession(_channelId);
- markClosedProducersAndConsumers();
+ checkValidDestination(destination);
+ AMQQueue dest = (AMQQueue) destination;
+ BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(destination);
+ return new QueueReceiverAdaptor(dest, consumer);
}
- private void markClosedProducersAndConsumers()
+ /**
+ * Creates a QueueReceiver using a message selector
+ *
+ * @param destination
+ * @param messageSelector
+ *
+ * @return QueueReceiver - a wrapper around our MessageConsumer
+ *
+ * @throws JMSException
+ */
+ public QueueReceiver createQueueReceiver(Destination destination, String messageSelector) throws JMSException
{
- try
- {
- // no need for a markClosed* method in this case since there is no protocol traffic closing a producer
- closeProducers();
- }
- catch (JMSException e)
- {
- _logger.error("Error closing session: " + e, e);
- }
- try
- {
- markClosedConsumers();
- }
- catch (JMSException e)
- {
- _logger.error("Error closing session: " + e, e);
- }
+ checkValidDestination(destination);
+ AMQQueue dest = (AMQQueue) destination;
+ BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(destination, messageSelector);
+
+ return new QueueReceiverAdaptor(dest, consumer);
}
/**
- * Called to close message producers cleanly. This may or may <b>not</b> be as a result of an error. There is
- * currently no way of propagating errors to message producers (this is a JMS limitation).
+ * Creates a QueueReceiver wrapping a MessageConsumer
+ *
+ * @param queue
+ *
+ * @return QueueReceiver
+ *
+ * @throws JMSException
*/
- private void closeProducers() throws JMSException
+ public QueueReceiver createReceiver(Queue queue) throws JMSException
{
- // we need to clone the list of producers since the close() method updates the _producers collection
- // which would result in a concurrent modification exception
- final ArrayList clonedProducers = new ArrayList(_producers.values());
+ checkNotClosed();
+ AMQQueue dest = (AMQQueue) queue;
+ BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest);
- final Iterator it = clonedProducers.iterator();
- while (it.hasNext())
- {
- final BasicMessageProducer prod = (BasicMessageProducer) it.next();
- prod.close();
- }
- // at this point the _producers map is empty
+ return new QueueReceiverAdaptor(dest, consumer);
}
/**
- * Called to close message consumers cleanly. This may or may <b>not</b> be as a result of an error.
+ * Creates a QueueReceiver wrapping a MessageConsumer using a message selector
*
- * @param error not null if this is a result of an error occurring at the connection level
+ * @param queue
+ * @param messageSelector
+ *
+ * @return QueueReceiver
+ *
+ * @throws JMSException
*/
- private void closeConsumers(Throwable error) throws JMSException
+ public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException
{
- if (_dispatcher != null)
- {
- _dispatcher.close();
- _dispatcher = null;
- }
- // we need to clone the list of consumers since the close() method updates the _consumers collection
- // which would result in a concurrent modification exception
- final ArrayList<BasicMessageConsumer> clonedConsumers = new ArrayList(_consumers.values());
+ checkNotClosed();
+ AMQQueue dest = (AMQQueue) queue;
+ BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector);
- final Iterator<BasicMessageConsumer> it = clonedConsumers.iterator();
- while (it.hasNext())
- {
- final BasicMessageConsumer con = it.next();
- if (error != null)
- {
- con.notifyError(error);
- }
- else
- {
- con.close();
- }
- }
- // at this point the _consumers map will be empty
+ return new QueueReceiverAdaptor(dest, consumer);
}
- private void markClosedConsumers() throws JMSException
+ public QueueSender createSender(Queue queue) throws JMSException
{
- if (_dispatcher != null)
- {
- _dispatcher.close();
- _dispatcher = null;
- }
- // we need to clone the list of consumers since the close() method updates the _consumers collection
- // which would result in a concurrent modification exception
- final ArrayList<BasicMessageConsumer> clonedConsumers = new ArrayList<BasicMessageConsumer>(_consumers.values());
+ checkNotClosed();
- final Iterator<BasicMessageConsumer> it = clonedConsumers.iterator();
- while (it.hasNext())
+ // return (QueueSender) createProducer(queue);
+ return new QueueSenderAdapter(createProducer(queue), queue);
+ }
+
+ public StreamMessage createStreamMessage() throws JMSException
+ {
+ synchronized (_connection.getFailoverMutex())
{
- final BasicMessageConsumer con = it.next();
- con.markClosed();
+ checkNotClosed();
+
+ return new JMSStreamMessage();
}
- // at this point the _consumers map will be empty
}
/**
- * Asks the broker to resend all unacknowledged messages for the session.
+ * Creates a non-durable subscriber
+ *
+ * @param topic
+ *
+ * @return TopicSubscriber - a wrapper round our MessageConsumer
*
* @throws JMSException
*/
- public void recover() throws JMSException
+ public TopicSubscriber createSubscriber(Topic topic) throws JMSException
{
checkNotClosed();
- checkNotTransacted(); // throws IllegalStateException if a transacted session
- // this is set only here, and the before the consumer's onMessage is called it is set to false
- _inRecovery = true;
- try
+ AMQTopic dest = checkValidTopic(topic);
+
+ // AMQTopic dest = new AMQTopic(topic.getTopicName());
+ return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
+ }
+
+ /**
+ * Creates a non-durable subscriber with a message selector
+ *
+ * @param topic
+ * @param messageSelector
+ * @param noLocal
+ *
+ * @return TopicSubscriber - a wrapper round our MessageConsumer
+ *
+ * @throws JMSException
+ */
+ public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException
+ {
+ checkNotClosed();
+ AMQTopic dest = checkValidTopic(topic);
+
+ // AMQTopic dest = new AMQTopic(topic.getTopicName());
+ return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal));
+ }
+
+ public TemporaryQueue createTemporaryQueue() throws JMSException
+ {
+ checkNotClosed();
+
+ return new AMQTemporaryQueue(this);
+ }
+
+ public TemporaryTopic createTemporaryTopic() throws JMSException
+ {
+ checkNotClosed();
+
+ return new AMQTemporaryTopic(this);
+ }
+
+ public TextMessage createTextMessage() throws JMSException
+ {
+ synchronized (_connection.getFailoverMutex())
{
+ checkNotClosed();
- boolean isSuspended = isSuspended();
+ return new JMSTextMessage();
+ }
+ }
- if (!isSuspended)
- {
- suspendChannel(true);
- }
+ public TextMessage createTextMessage(String text) throws JMSException
+ {
- for (BasicMessageConsumer consumer : _consumers.values())
- {
- consumer.clearUnackedMessages();
- }
+ TextMessage msg = createTextMessage();
+ msg.setText(text);
- if (_dispatcher != null)
- {
- _dispatcher.rollback();
- }
+ return msg;
+ }
- if (isStrictAMQP())
+ public Topic createTopic(String topicName) throws JMSException
+ {
+ checkNotClosed();
+
+ if (topicName.indexOf('/') == -1)
+ {
+ return new AMQTopic(getDefaultTopicExchangeName(), new AMQShortString(topicName));
+ }
+ else
+ {
+ try
{
- // We can't use the BasicRecoverBody-OK method as it isn't part of the spec.
- _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(),
- getProtocolMinorVersion(),
- false)); // requeue
- _logger.warn("Session Recover cannot be guaranteed with STRICT_AMQP. Messages may arrive out of order.");
+ return new AMQTopic(new AMQBindingURL(topicName));
}
- else
+ catch (URLSyntaxException urlse)
{
+ JMSException jmse = new JMSException(urlse.getReason());
+ jmse.setLinkedException(urlse);
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- _connection.getProtocolHandler().syncWrite(BasicRecoverBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(),
- getProtocolMinorVersion(),
- false) // requeue
- , BasicRecoverOkBody.class);
- }
- if (!isSuspended)
- {
- suspendChannel(false);
+ throw jmse;
}
}
- catch (AMQException e)
- {
- throw new JMSAMQException(e);
- }
}
- boolean isInRecovery()
+ public void declareExchange(AMQShortString name, AMQShortString type, boolean nowait) throws AMQException
{
- return _inRecovery;
+ declareExchange(name, type, getProtocolHandler(), nowait);
}
- void setInRecovery(boolean inRecovery)
+ public int getAcknowledgeMode() throws JMSException
{
- _inRecovery = inRecovery;
+ checkNotClosed();
+
+ return _acknowledgeMode;
}
- public void acknowledge() throws JMSException
+ public AMQConnection getAMQConnection()
{
- if (isClosed())
- {
- throw new IllegalStateException("Session is already closed");
- }
- for (BasicMessageConsumer consumer : _consumers.values())
- {
- consumer.acknowledge();
- }
+ return _connection;
+ }
+ public int getChannelId()
+ {
+ return _channelId;
+ }
+ public int getDefaultPrefetch()
+ {
+ return _defaultPrefetchHighMark;
}
+ public int getDefaultPrefetchHigh()
+ {
+ return _defaultPrefetchHighMark;
+ }
- public MessageListener getMessageListener() throws JMSException
+ public int getDefaultPrefetchLow()
{
-// checkNotClosed();
- return _messageListener;
+ return _defaultPrefetchLowMark;
}
- public void setMessageListener(MessageListener listener) throws JMSException
+ public AMQShortString getDefaultQueueExchangeName()
{
-// checkNotClosed();
-//
-// if (_dispatcher != null && !_dispatcher.connectionStopped())
-// {
-// throw new javax.jms.IllegalStateException("Attempt to set listener while session is started.");
-// }
-//
-// // We are stopped
-// for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
-// {
-// BasicMessageConsumer consumer = i.next();
-//
-// if (consumer.isReceiving())
-// {
-// throw new javax.jms.IllegalStateException("Another thread is already receiving synchronously.");
-// }
-// }
-//
-// _messageListener = listener;
-//
-// for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
-// {
-// i.next().setMessageListener(_messageListener);
-// }
+ return _connection.getDefaultQueueExchangeName();
+ }
+ public AMQShortString getDefaultTopicExchangeName()
+ {
+ return _connection.getDefaultTopicExchangeName();
}
- public void run()
+ public MessageListener getMessageListener() throws JMSException
{
- throw new java.lang.UnsupportedOperationException();
+ // checkNotClosed();
+ return _messageListener;
}
- public BasicMessageProducer createProducer(Destination destination, boolean mandatory,
- boolean immediate, boolean waitUntilSent)
- throws JMSException
+ public AMQShortString getTemporaryQueueExchangeName()
{
- return createProducerImpl(destination, mandatory, immediate, waitUntilSent);
+ return _connection.getTemporaryQueueExchangeName();
}
- public BasicMessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate)
- throws JMSException
+ public AMQShortString getTemporaryTopicExchangeName()
{
- return createProducerImpl(destination, mandatory, immediate);
+ return _connection.getTemporaryTopicExchangeName();
}
- public BasicMessageProducer createProducer(Destination destination, boolean immediate)
- throws JMSException
+ public int getTicket()
{
- return createProducerImpl(destination, DEFAULT_MANDATORY, immediate);
+ return _ticket;
}
- public BasicMessageProducer createProducer(Destination destination) throws JMSException
+ public boolean getTransacted() throws JMSException
{
- return createProducerImpl(destination, DEFAULT_MANDATORY, DEFAULT_IMMEDIATE);
+ checkNotClosed();
+
+ return _transacted;
}
- private BasicMessageProducer createProducerImpl(Destination destination, boolean mandatory,
- boolean immediate)
- throws JMSException
+ public boolean hasConsumer(Destination destination)
{
- return createProducerImpl(destination, mandatory, immediate, false);
+ AtomicInteger counter = _destinationConsumerCount.get(destination);
+
+ return (counter != null) && (counter.get() != 0);
}
- private BasicMessageProducer createProducerImpl(final Destination destination, final boolean mandatory,
- final boolean immediate, final boolean waitUntilSent)
- throws JMSException
+ public boolean isStrictAMQP()
{
- return (BasicMessageProducer) new FailoverSupport()
- {
- public Object operation() throws JMSException
- {
- checkNotClosed();
- long producerId = getNextProducerId();
- BasicMessageProducer producer = new BasicMessageProducer(_connection, (AMQDestination) destination, _transacted, _channelId,
- AMQSession.this, getProtocolHandler(),
- producerId, immediate, mandatory, waitUntilSent);
- registerProducer(producerId, producer);
- return producer;
- }
- }.execute(_connection);
+ return _strictAMQP;
+ }
+
+ public boolean isSuspended()
+ {
+ return _suspended;
}
/**
- * Creates a QueueReceiver
- *
- * @param destination
- *
- * @return QueueReceiver - a wrapper around our MessageConsumer
+ * Invoked by the MINA IO thread (indirectly) when a message is received from the transport. Puts the message onto
+ * the queue read by the dispatcher.
*
- * @throws JMSException
+ * @param message the message that has been received
*/
- public QueueReceiver createQueueReceiver(Destination destination) throws JMSException
+ public void messageReceived(UnprocessedMessage message)
{
- checkValidDestination(destination);
- AMQQueue dest = (AMQQueue) destination;
- BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(destination);
- return new QueueReceiverAdaptor(dest, consumer);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Message["
+ + ((message.getDeliverBody() == null) ? ("B:" + message.getBounceBody()) : ("D:" + message.getDeliverBody()))
+ + "] received in session with channel id " + _channelId);
+ }
+
+ if (message.getDeliverBody() == null)
+ {
+ // Return of the bounced message.
+ returnBouncedMessage(message);
+ }
+ else
+ {
+ _queue.add(message);
+ }
}
/**
- * Creates a QueueReceiver using a message selector
+ * Stops message delivery in this session, and restarts message delivery with the oldest unacknowledged message.
*
- * @param destination
- * @param messageSelector
+ * <p/>All consumers deliver messages in a serial order. Acknowledging a received message automatically acknowledges
+ * all messages that have been delivered to the client.
*
- * @return QueueReceiver - a wrapper around our MessageConsumer
+ * <p/>Restarting a session causes it to take the following actions:
*
- * @throws JMSException
+ * <ul> <li>Stop message delivery.</li> <li>Mark all messages that might have been delivered but not acknowledged as
+ * "redelivered". <li>Restart the delivery sequence including all unacknowledged messages that had been previously
+ * delivered. Redelivered messages do not have to be delivered in exactly their original delivery order.</li> </ul>
+ *
+ * <p/>If the recover operation is interrupted by a fail-over, between asking that the broker begin recovery and
+ * receiving acknolwedgement that it hasm then a JMSException will be thrown. In this case it will not be possible
+ * for the client to determine whether the broker is going to recover the session or not.
+ *
+ * @throws JMSException If the JMS provider fails to stop and restart message delivery due to some internal error.
+ * Not that this does not necessarily mean that the recovery has failed, but simply that it is
+ * not possible to tell if it has or not.
+ * @todo Be aware of possible changes to parameter order as versions change.
*/
- public QueueReceiver createQueueReceiver(Destination destination, String messageSelector) throws JMSException
+ public void recover() throws JMSException
{
- checkValidDestination(destination);
- AMQQueue dest = (AMQQueue) destination;
- BasicMessageConsumer consumer = (BasicMessageConsumer)
- createConsumer(destination, messageSelector);
- return new QueueReceiverAdaptor(dest, consumer);
- }
+ // Ensure that the session is open.
+ checkNotClosed();
- public MessageConsumer createConsumer(Destination destination) throws JMSException
- {
- checkValidDestination(destination);
- return createConsumerImpl(destination,
- _defaultPrefetchHighMark,
- _defaultPrefetchLowMark,
- false,
- false,
- null,
- null,
- false,
- false);
- }
+ // Ensure that the session is not transacted.
+ checkNotTransacted();
- public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
- {
- checkValidDestination(destination);
- return createConsumerImpl(destination,
- _defaultPrefetchHighMark,
- _defaultPrefetchLowMark,
- false,
- false,
- messageSelector,
- null,
- false,
- false);
- }
+ // this is set only here, and the before the consumer's onMessage is called it is set to false
+ _inRecovery = true;
+ try
+ {
- public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal)
- throws JMSException
- {
- checkValidDestination(destination);
- return createConsumerImpl(destination,
- _defaultPrefetchHighMark,
- _defaultPrefetchLowMark,
- noLocal,
- false,
- messageSelector,
- null,
- false,
- false);
- }
+ boolean isSuspended = isSuspended();
- public MessageConsumer createBrowserConsumer(Destination destination,
- String messageSelector,
- boolean noLocal)
- throws JMSException
- {
- checkValidDestination(destination);
- return createConsumerImpl(destination,
- _defaultPrefetchHighMark,
- _defaultPrefetchLowMark,
- noLocal,
- false,
- messageSelector,
- null,
- true,
- true);
+ if (!isSuspended)
+ {
+ suspendChannel(true);
+ }
+
+ for (BasicMessageConsumer consumer : _consumers.values())
+ {
+ consumer.clearUnackedMessages();
+ }
+
+ if (_dispatcher != null)
+ {
+ _dispatcher.rollback();
+ }
+
+ if (isStrictAMQP())
+ {
+ // We can't use the BasicRecoverBody-OK method as it isn't part of the spec.
+ _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId,
+ getProtocolMajorVersion(), getProtocolMinorVersion(), false)); // requeue
+ _logger.warn("Session Recover cannot be guaranteed with STRICT_AMQP. Messages may arrive out of order.");
+ }
+ else
+ {
+
+ _connection.getProtocolHandler().syncWrite(
+ BasicRecoverBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), false) // requeue
+ , BasicRecoverOkBody.class);
+ }
+
+ if (!isSuspended)
+ {
+ suspendChannel(false);
+ }
+ }
+ catch (AMQException e)
+ {
+ throw new JMSAMQException("Recover failed: " + e.getMessage(), e);
+ }
+ catch (FailoverException e)
+ {
+ throw new JMSAMQException("Recovery was interrupted by fail-over. Recovery status is not known.", e);
+ }
}
- public MessageConsumer createConsumer(Destination destination,
- int prefetch,
- boolean noLocal,
- boolean exclusive,
- String selector) throws JMSException
+ public void rejectMessage(UnprocessedMessage message, boolean requeue)
{
- checkValidDestination(destination);
- return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, null, false, false);
- }
+ if (_logger.isTraceEnabled())
+ {
+ _logger.trace("Rejecting Unacked message:" + message.getDeliverBody().deliveryTag);
+ }
- public MessageConsumer createConsumer(Destination destination,
- int prefetchHigh,
- int prefetchLow,
- boolean noLocal,
[... 2269 lines stripped ...]