You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/21 16:42:51 UTC
svn commit: r1187375 [29/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/
cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/
cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/
cpp/bindings/qmf2/python/ cpp/bindings/qmf...
Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java Fri Oct 21 14:42:12 2011
@@ -21,9 +21,8 @@
package org.apache.qpid.client;
import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Destination;
import javax.naming.NamingException;
@@ -34,8 +33,6 @@ import javax.naming.StringRefAddr;
import org.apache.qpid.client.messaging.address.AddressHelper;
import org.apache.qpid.client.messaging.address.Link;
import org.apache.qpid.client.messaging.address.Node;
-import org.apache.qpid.client.messaging.address.QpidExchangeOptions;
-import org.apache.qpid.client.messaging.address.QpidQueueOptions;
import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
@@ -63,7 +60,7 @@ public abstract class AMQDestination imp
private boolean _browseOnly;
- private boolean _isAddressResolved;
+ private AtomicLong _addressResolved = new AtomicLong(0);
private AMQShortString _queueName;
@@ -78,15 +75,10 @@ public abstract class AMQDestination imp
private boolean _exchangeExistsChecked;
- private byte[] _byteEncoding;
- private static final int IS_DURABLE_MASK = 0x1;
- private static final int IS_EXCLUSIVE_MASK = 0x2;
- private static final int IS_AUTODELETE_MASK = 0x4;
-
public static final int QUEUE_TYPE = 1;
public static final int TOPIC_TYPE = 2;
public static final int UNKNOWN_TYPE = 3;
-
+
// ----- Fields required to support new address syntax -------
public enum DestSyntax {
@@ -323,7 +315,11 @@ public abstract class AMQDestination imp
{
if(_urlAsShortString == null)
{
- toURL();
+ if (_url == null)
+ {
+ toURL();
+ }
+ _urlAsShortString = new AMQShortString(_url);
}
return _urlAsShortString;
}
@@ -370,7 +366,6 @@ public abstract class AMQDestination imp
// calculated URL now out of date
_url = null;
_urlAsShortString = null;
- _byteEncoding = null;
}
public AMQShortString getRoutingKey()
@@ -508,59 +503,10 @@ public abstract class AMQDestination imp
sb.deleteCharAt(sb.length() - 1);
url = sb.toString();
_url = url;
- _urlAsShortString = new AMQShortString(url);
}
return url;
}
- public byte[] toByteEncoding()
- {
- byte[] encoding = _byteEncoding;
- if(encoding == null)
- {
- int size = _exchangeClass.length() + 1 +
- _exchangeName.length() + 1 +
- 0 + // in place of the destination name
- (_queueName == null ? 0 : _queueName.length()) + 1 +
- 1;
- encoding = new byte[size];
- int pos = 0;
-
- pos = _exchangeClass.writeToByteArray(encoding, pos);
- pos = _exchangeName.writeToByteArray(encoding, pos);
-
- encoding[pos++] = (byte)0;
-
- if(_queueName == null)
- {
- encoding[pos++] = (byte)0;
- }
- else
- {
- pos = _queueName.writeToByteArray(encoding,pos);
- }
- byte options = 0;
- if(_isDurable)
- {
- options |= IS_DURABLE_MASK;
- }
- if(_isExclusive)
- {
- options |= IS_EXCLUSIVE_MASK;
- }
- if(_isAutoDelete)
- {
- options |= IS_AUTODELETE_MASK;
- }
- encoding[pos] = options;
-
-
- _byteEncoding = encoding;
-
- }
- return encoding;
- }
-
public boolean equals(Object o)
{
if (this == o)
@@ -614,53 +560,6 @@ public abstract class AMQDestination imp
null); // factory location
}
-
- public static Destination createDestination(byte[] byteEncodedDestination)
- {
- AMQShortString exchangeClass;
- AMQShortString exchangeName;
- AMQShortString routingKey;
- AMQShortString queueName;
- boolean isDurable;
- boolean isExclusive;
- boolean isAutoDelete;
-
- int pos = 0;
- exchangeClass = AMQShortString.readFromByteArray(byteEncodedDestination, pos);
- pos+= exchangeClass.length() + 1;
- exchangeName = AMQShortString.readFromByteArray(byteEncodedDestination, pos);
- pos+= exchangeName.length() + 1;
- routingKey = AMQShortString.readFromByteArray(byteEncodedDestination, pos);
- pos+= (routingKey == null ? 0 : routingKey.length()) + 1;
- queueName = AMQShortString.readFromByteArray(byteEncodedDestination, pos);
- pos+= (queueName == null ? 0 : queueName.length()) + 1;
- int options = byteEncodedDestination[pos];
- isDurable = (options & IS_DURABLE_MASK) != 0;
- isExclusive = (options & IS_EXCLUSIVE_MASK) != 0;
- isAutoDelete = (options & IS_AUTODELETE_MASK) != 0;
-
- if (exchangeClass.equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS))
- {
- return new AMQQueue(exchangeName,routingKey,queueName,isExclusive,isAutoDelete,isDurable);
- }
- else if (exchangeClass.equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS))
- {
- return new AMQTopic(exchangeName,routingKey,isAutoDelete,queueName,isDurable);
- }
- else if (exchangeClass.equals(ExchangeDefaults.HEADERS_EXCHANGE_CLASS))
- {
- return new AMQHeadersExchange(routingKey);
- }
- else
- {
- return new AMQAnyDestination(exchangeName,exchangeClass,
- routingKey,isExclusive,
- isAutoDelete,queueName,
- isDurable, new AMQShortString[0]);
- }
-
- }
-
public static Destination createDestination(BindingURL binding)
{
AMQShortString type = binding.getExchangeClass();
@@ -842,12 +741,12 @@ public abstract class AMQDestination imp
public boolean isAddressResolved()
{
- return _isAddressResolved;
+ return _addressResolved.get() > 0;
}
- public void setAddressResolved(boolean addressResolved)
+ public void setAddressResolved(long addressResolved)
{
- _isAddressResolved = addressResolved;
+ _addressResolved.set(addressResolved);
}
private static Address createAddressFromString(String str)
@@ -895,7 +794,7 @@ public abstract class AMQDestination imp
return _browseOnly;
}
- public void setBrowseOnly(boolean b)
+ private void setBrowseOnly(boolean b)
{
_browseOnly = b;
}
@@ -925,7 +824,7 @@ public abstract class AMQDestination imp
dest.setTargetNode(_targetNode);
dest.setSourceNode(_sourceNode);
dest.setLink(_link);
- dest.setAddressResolved(_isAddressResolved);
+ dest.setAddressResolved(_addressResolved.get());
return dest;
}
@@ -938,4 +837,9 @@ public abstract class AMQDestination imp
{
_isDurable = b;
}
+
+ public boolean isResolvedAfter(long time)
+ {
+ return _addressResolved.get() > time;
+ }
}
Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Fri Oct 21 14:42:12 2011
@@ -70,7 +70,6 @@ import org.apache.qpid.AMQDisconnectedEx
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInvalidArgumentException;
import org.apache.qpid.AMQInvalidRoutingKeyException;
-import org.apache.qpid.client.AMQDestination.AddressOption;
import org.apache.qpid.client.AMQDestination.DestSyntax;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverNoopSupport;
@@ -88,8 +87,6 @@ import org.apache.qpid.client.message.JM
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.client.state.AMQState;
-import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.AMQShortString;
@@ -97,7 +94,10 @@ import org.apache.qpid.framing.FieldTabl
import org.apache.qpid.framing.FieldTableFactory;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.jms.Session;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.thread.Threading;
+import org.apache.qpid.transport.SessionException;
+import org.apache.qpid.transport.TransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -213,8 +213,6 @@ public abstract class AMQSession<C exten
*/
protected final boolean DEFAULT_MANDATORY = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true"));
- protected final boolean DEFAULT_WAIT_ON_SEND = Boolean.parseBoolean(System.getProperty("qpid.default_wait_on_send", "false"));
-
/**
* The period to wait while flow controlled before sending a log message confirming that the session is still
* waiting on flow control being revoked
@@ -310,7 +308,7 @@ public abstract class AMQSession<C exten
protected final FlowControllingBlockingQueue _queue;
/** Holds the highest received delivery tag. */
- private final AtomicLong _highestDeliveryTag = new AtomicLong(-1);
+ protected final AtomicLong _highestDeliveryTag = new AtomicLong(-1);
private final AtomicLong _rollbackMark = new AtomicLong(-1);
/** All the not yet acknowledged message tags */
@@ -364,7 +362,13 @@ public abstract class AMQSession<C exten
* Set when recover is called. This is to handle the case where recover() is called by application code during
* onMessage() processing to ensure that an auto ack is not sent.
*/
- private boolean _inRecovery;
+ private volatile boolean _sessionInRecovery;
+
+ /**
+ * Set when the dispatcher should direct incoming messages straight into the UnackedMessage list instead of
+ * to the syncRecieveQueue or MessageListener. Used during cleanup, e.g. in Session.recover().
+ */
+ private volatile boolean _usingDispatcherForCleanup;
/** Used to indicates that the connection to which this session belongs, has been stopped. */
private boolean _connectionStopped;
@@ -567,6 +571,8 @@ public abstract class AMQSession<C exten
close(-1);
}
+ public abstract AMQException getLastException();
+
public void checkNotClosed() throws JMSException
{
try
@@ -575,16 +581,20 @@ public abstract class AMQSession<C exten
}
catch (IllegalStateException ise)
{
- // if the Connection has closed then we should throw any exception that has occurred that we were not waiting for
- AMQStateManager manager = _connection.getProtocolHandler().getStateManager();
+ AMQException ex = getLastException();
+ if (ex != null)
+ {
+ IllegalStateException ssnClosed = new IllegalStateException(
+ "Session has been closed", ex.getErrorCode().toString());
- if (manager.getCurrentState().equals(AMQState.CONNECTION_CLOSED) && manager.getLastException() != null)
+ ssnClosed.setLinkedException(ex);
+ ssnClosed.initCause(ex);
+ throw ssnClosed;
+ }
+ else
{
- ise.setLinkedException(manager.getLastException());
- ise.initCause(ise.getLinkedException());
+ throw ise;
}
-
- throw ise;
}
}
@@ -600,29 +610,36 @@ public abstract class AMQSession<C exten
* Acknowledges all unacknowledged messages on the session, for all message consumers on the session.
*
* @throws IllegalStateException If the session is closed.
+ * @throws JMSException if there is a problem during acknowledge process.
*/
- public void acknowledge() throws IllegalStateException
+ public void acknowledge() throws IllegalStateException, JMSException
{
if (isClosed())
{
throw new IllegalStateException("Session is already closed");
}
- else if (hasFailedOver())
+ else if (hasFailedOverDirty())
{
+ //perform an implicit recover in this scenario
+ recover();
+
+ //notify the consumer
throw new IllegalStateException("has failed over");
}
- while (true)
+ try
{
- Long tag = _unacknowledgedMessageTags.poll();
- if (tag == null)
- {
- break;
- }
- acknowledgeMessage(tag, false);
+ acknowledgeImpl();
+ markClean();
+ }
+ catch (TransportException e)
+ {
+ throw toJMSException("Exception while acknowledging message(s):" + e.getMessage(), e);
}
}
+ protected abstract void acknowledgeImpl() throws JMSException;
+
/**
* Acknowledge one or many messages.
*
@@ -757,6 +774,10 @@ public abstract class AMQSession<C exten
_logger.debug(
"Got FailoverException during channel close, ignored as channel already marked as closed.");
}
+ catch (TransportException e)
+ {
+ throw toJMSException("Error closing session:" + e.getMessage(), e);
+ }
finally
{
_connection.deregisterSession(_channelId);
@@ -827,51 +848,44 @@ public abstract class AMQSession<C exten
* @throws JMSException If the JMS provider fails to commit the transaction due to some internal error. This does
* not mean that the commit is known to have failed, merely that it is not known whether it
* failed or not.
- * @todo Be aware of possible changes to parameter order as versions change.
*/
public void commit() throws JMSException
{
checkTransacted();
- try
+ //Check that we are clean to commit.
+ if (_failedOverDirty)
{
- //Check that we are clean to commit.
- if (_failedOverDirty)
+ if (_logger.isDebugEnabled())
{
- rollback();
-
- throw new TransactionRolledBackException("Connection failover has occured since last send. " +
- "Forced rollback");
+ _logger.debug("Session " + _channelId + " was dirty whilst failing over. Rolling back.");
}
+ rollback();
+ throw new TransactionRolledBackException("Connection failover has occured with uncommitted transaction activity." +
+ "The session transaction was rolled back.");
+ }
- // Acknowledge all delivered messages
- while (true)
- {
- Long tag = _deliveredMessageTags.poll();
- if (tag == null)
- {
- break;
- }
-
- acknowledgeMessage(tag, false);
- }
- // Commits outstanding messages and acknowledgments
- sendCommit();
+ try
+ {
+ commitImpl();
markClean();
}
catch (AMQException e)
{
- throw new JMSAMQException("Failed to commit: " + e.getMessage() + ":" + e.getCause(), e);
+ throw new JMSAMQException("Exception during commit: " + e.getMessage() + ":" + e.getCause(), e);
}
catch (FailoverException e)
{
throw new JMSAMQException("Fail-over interrupted commit. Status of the commit is uncertain.", e);
}
+ catch(TransportException e)
+ {
+ throw toJMSException("Session exception occured while trying to commit: " + e.getMessage(), e);
+ }
}
- public abstract void sendCommit() throws AMQException, FailoverException;
-
+ protected abstract void commitImpl() throws AMQException, FailoverException, TransportException;
public void confirmConsumerCancelled(int consumerTag)
{
@@ -949,7 +963,7 @@ public abstract class AMQSession<C exten
return new AMQQueueBrowser(this, (AMQQueue) queue, messageSelector);
}
- public MessageConsumer createBrowserConsumer(Destination destination, String messageSelector, boolean noLocal)
+ protected MessageConsumer createBrowserConsumer(Destination destination, String messageSelector, boolean noLocal)
throws JMSException
{
checkValidDestination(destination);
@@ -963,15 +977,7 @@ public abstract class AMQSession<C exten
checkValidDestination(destination);
return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, (destination instanceof Topic), null, null,
- ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false);
- }
-
- public C createExclusiveConsumer(Destination destination) throws JMSException
- {
- checkValidDestination(destination);
-
- return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, true, null, null,
- ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false);
+ isBrowseOnlyDestination(destination), false);
}
public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
@@ -979,7 +985,7 @@ public abstract class AMQSession<C exten
checkValidDestination(destination);
return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, (destination instanceof Topic),
- messageSelector, null, ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false);
+ messageSelector, null, isBrowseOnlyDestination(destination), false);
}
public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal)
@@ -988,16 +994,7 @@ public abstract class AMQSession<C exten
checkValidDestination(destination);
return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, (destination instanceof Topic),
- messageSelector, null, ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false);
- }
-
- public MessageConsumer createExclusiveConsumer(Destination destination, String messageSelector, boolean noLocal)
- throws JMSException
- {
- checkValidDestination(destination);
-
- return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, true,
- messageSelector, null, false, false);
+ messageSelector, null, isBrowseOnlyDestination(destination), false);
}
public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive,
@@ -1005,23 +1002,15 @@ public abstract class AMQSession<C exten
{
checkValidDestination(destination);
- return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, null, ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false);
+ return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, null, isBrowseOnlyDestination(destination), false);
}
public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal,
- boolean exclusive, String selector) throws JMSException
+ boolean exclusive, String selector) throws JMSException
{
checkValidDestination(destination);
- return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null, ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false);
- }
-
- public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive,
- String selector, FieldTable rawSelector) throws JMSException
- {
- checkValidDestination(destination);
-
- return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, rawSelector, ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false);
+ return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null, isBrowseOnlyDestination(destination), false);
}
public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal,
@@ -1029,7 +1018,7 @@ public abstract class AMQSession<C exten
{
checkValidDestination(destination);
- return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, rawSelector, ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()),
+ return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, rawSelector, isBrowseOnlyDestination(destination),
false);
}
@@ -1043,8 +1032,33 @@ public abstract class AMQSession<C exten
throws JMSException
{
checkNotClosed();
- AMQTopic origTopic = checkValidTopic(topic, true);
+ Topic origTopic = checkValidTopic(topic, true);
+
AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection);
+ if (dest.getDestSyntax() == DestSyntax.ADDR &&
+ !dest.isAddressResolved())
+ {
+ try
+ {
+ handleAddressBasedDestination(dest,false,true);
+ if (dest.getAddressType() != AMQDestination.TOPIC_TYPE)
+ {
+ throw new JMSException("Durable subscribers can only be created for Topics");
+ }
+ dest.getSourceNode().setDurable(true);
+ }
+ catch(AMQException e)
+ {
+ JMSException ex = new JMSException("Error when verifying destination");
+ ex.initCause(e);
+ ex.setLinkedException(e);
+ throw ex;
+ }
+ catch(TransportException e)
+ {
+ throw toJMSException("Error when verifying destination", e);
+ }
+ }
String messageSelector = ((selector == null) || (selector.trim().length() == 0)) ? null : selector;
@@ -1056,15 +1070,9 @@ public abstract class AMQSession<C exten
// Not subscribed to this name in the current session
if (subscriber == null)
{
- AMQShortString topicName;
- if (topic instanceof AMQTopic)
- {
- topicName = ((AMQTopic) topic).getRoutingKey();
- } else
- {
- topicName = new AMQShortString(topic.getTopicName());
- }
-
+ // After the address is resolved routing key will not be null.
+ AMQShortString topicName = dest.getRoutingKey();
+
if (_strictAMQP)
{
if (_strictAMQPFATAL)
@@ -1135,6 +1143,10 @@ public abstract class AMQSession<C exten
return subscriber;
}
+ catch (TransportException e)
+ {
+ throw toJMSException("Exception while creating durable subscriber:" + e.getMessage(), e);
+ }
finally
{
_subscriberDetails.unlock();
@@ -1195,12 +1207,6 @@ public abstract class AMQSession<C exten
return createProducerImpl(destination, mandatory, immediate);
}
- public P createProducer(Destination destination, boolean mandatory, boolean immediate,
- boolean waitUntilSent) throws JMSException
- {
- return createProducerImpl(destination, mandatory, immediate, waitUntilSent);
- }
-
public TopicPublisher createPublisher(Topic topic) throws JMSException
{
checkNotClosed();
@@ -1225,7 +1231,6 @@ public abstract class AMQSession<C exten
else
{
AMQQueue queue = new AMQQueue(queueName);
- queue.setCreate(AddressOption.ALWAYS);
return queue;
}
@@ -1307,8 +1312,8 @@ public abstract class AMQSession<C exten
public QueueReceiver createQueueReceiver(Destination destination) throws JMSException
{
checkValidDestination(destination);
- AMQQueue dest = (AMQQueue) destination;
- C consumer = (C) createConsumer(destination);
+ Queue dest = validateQueue(destination);
+ C consumer = (C) createConsumer(dest);
return new QueueReceiverAdaptor(dest, consumer);
}
@@ -1326,8 +1331,8 @@ public abstract class AMQSession<C exten
public QueueReceiver createQueueReceiver(Destination destination, String messageSelector) throws JMSException
{
checkValidDestination(destination);
- AMQQueue dest = (AMQQueue) destination;
- C consumer = (C) createConsumer(destination, messageSelector);
+ Queue dest = validateQueue(destination);
+ C consumer = (C) createConsumer(dest, messageSelector);
return new QueueReceiverAdaptor(dest, consumer);
}
@@ -1344,7 +1349,7 @@ public abstract class AMQSession<C exten
public QueueReceiver createReceiver(Queue queue) throws JMSException
{
checkNotClosed();
- AMQQueue dest = (AMQQueue) queue;
+ Queue dest = validateQueue(queue);
C consumer = (C) createConsumer(dest);
return new QueueReceiverAdaptor(dest, consumer);
@@ -1363,17 +1368,28 @@ public abstract class AMQSession<C exten
public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException
{
checkNotClosed();
- AMQQueue dest = (AMQQueue) queue;
+ Queue dest = validateQueue(queue);
C consumer = (C) createConsumer(dest, messageSelector);
return new QueueReceiverAdaptor(dest, consumer);
}
+
+ private Queue validateQueue(Destination dest) throws InvalidDestinationException
+ {
+ if (dest instanceof AMQDestination && dest instanceof javax.jms.Queue)
+ {
+ return (Queue)dest;
+ }
+ else
+ {
+ throw new InvalidDestinationException("The destination object used is not from this provider or of type javax.jms.Queue");
+ }
+ }
public QueueSender createSender(Queue queue) throws JMSException
{
checkNotClosed();
- // return (QueueSender) createProducer(queue);
return new QueueSenderAdapter(createProducer(queue), queue);
}
@@ -1408,10 +1424,10 @@ public abstract class AMQSession<C exten
public TopicSubscriber createSubscriber(Topic topic) throws JMSException
{
checkNotClosed();
- AMQTopic dest = checkValidTopic(topic);
+ checkValidTopic(topic);
- // AMQTopic dest = new AMQTopic(topic.getTopicName());
- return new TopicSubscriberAdaptor(dest, (C) createExclusiveConsumer(dest));
+ return new TopicSubscriberAdaptor<C>(topic,
+ createConsumerImpl(topic, _prefetchHighMark, _prefetchLowMark, false, true, null, null, false, false));
}
/**
@@ -1428,10 +1444,11 @@ public abstract class AMQSession<C exten
public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException
{
checkNotClosed();
- AMQTopic dest = checkValidTopic(topic);
+ checkValidTopic(topic);
- // AMQTopic dest = new AMQTopic(topic.getTopicName());
- return new TopicSubscriberAdaptor(dest, (C) createExclusiveConsumer(dest, messageSelector, noLocal));
+ return new TopicSubscriberAdaptor<C>(topic,
+ createConsumerImpl(topic, _prefetchHighMark, _prefetchLowMark, noLocal,
+ true, messageSelector, null, false, false));
}
public TemporaryQueue createTemporaryQueue() throws JMSException
@@ -1533,10 +1550,8 @@ public abstract class AMQSession<C exten
abstract public void sync() throws AMQException;
- public int getAcknowledgeMode() throws JMSException
+ public int getAcknowledgeMode()
{
- checkNotClosed();
-
return _acknowledgeMode;
}
@@ -1596,10 +1611,8 @@ public abstract class AMQSession<C exten
return _ticket;
}
- public boolean getTransacted() throws JMSException
+ public boolean getTransacted()
{
- checkNotClosed();
-
return _transacted;
}
@@ -1695,13 +1708,14 @@ public abstract class AMQSession<C exten
// Ensure that the session is not transacted.
checkNotTransacted();
- // flush any acks we are holding in the buffer.
- flushAcknowledgments();
-
- // this is set only here, and the before the consumer's onMessage is called it is set to false
- _inRecovery = true;
+
try
{
+ // flush any acks we are holding in the buffer.
+ flushAcknowledgments();
+
+ // this is only set true here, and only set false when the consumers preDeliver method is called
+ _sessionInRecovery = true;
boolean isSuspended = isSuspended();
@@ -1709,9 +1723,18 @@ public abstract class AMQSession<C exten
{
suspendChannel(true);
}
-
+
+ // Set to true to short circuit delivery of anything currently
+ //in the pre-dispatch queue.
+ _usingDispatcherForCleanup = true;
+
syncDispatchQueue();
-
+
+ // Set to false before sending the recover as 0-8/9/9-1 will
+ //send messages back before the recover completes, and we
+ //probably shouldn't clean those! ;-)
+ _usingDispatcherForCleanup = false;
+
if (_dispatcher != null)
{
_dispatcher.recover();
@@ -1720,10 +1743,7 @@ public abstract class AMQSession<C exten
sendRecover();
markClean();
-
- // Set inRecovery to false before you start message flow again again.
- _inRecovery = false;
-
+
if (!isSuspended)
{
suspendChannel(false);
@@ -1737,7 +1757,10 @@ public abstract class AMQSession<C exten
{
throw new JMSAMQException("Recovery was interrupted by fail-over. Recovery status is not known.", e);
}
-
+ catch(TransportException e)
+ {
+ throw toJMSException("Recover failed: " + e.getMessage(), e);
+ }
}
protected abstract void sendRecover() throws AMQException, FailoverException;
@@ -1795,9 +1818,7 @@ public abstract class AMQSession<C exten
suspendChannel(true);
}
- // Let the dispatcher know that all the incomming messages
- // should be rolled back(reject/release)
- _rollbackMark.set(_highestDeliveryTag.get());
+ setRollbackMark();
syncDispatchQueue();
@@ -1822,6 +1843,10 @@ public abstract class AMQSession<C exten
{
throw new JMSAMQException("Fail-over interrupted rollback. Status of the rollback is uncertain.", e);
}
+ catch (TransportException e)
+ {
+ throw toJMSException("Failure to rollback:" + e.getMessage(), e);
+ }
}
}
@@ -1868,7 +1893,14 @@ public abstract class AMQSession<C exten
*/
public void unsubscribe(String name) throws JMSException
{
- unsubscribe(name, false);
+ try
+ {
+ unsubscribe(name, false);
+ }
+ catch (TransportException e)
+ {
+ throw toJMSException("Exception while unsubscribing:" + e.getMessage(), e);
+ }
}
/**
@@ -1945,6 +1977,12 @@ public abstract class AMQSession<C exten
{
checkTemporaryDestination(destination);
+ if(!noConsume && isBrowseOnlyDestination(destination))
+ {
+ throw new InvalidDestinationException("The consumer being created is not 'noConsume'," +
+ "but a 'browseOnly' Destination has been supplied.");
+ }
+
final String messageSelector;
if (_strictAMQP && !((selector == null) || selector.equals("")))
@@ -1989,8 +2027,16 @@ public abstract class AMQSession<C exten
// argument, as specifying null for the arguments when querying means they should not be checked at all
ft.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector == null ? "" : messageSelector);
- C consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow,
- noLocal, exclusive, messageSelector, ft, noConsume, autoClose);
+ C consumer;
+ try
+ {
+ consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow,
+ noLocal, exclusive, messageSelector, ft, noConsume, autoClose);
+ }
+ catch(TransportException e)
+ {
+ throw toJMSException("Exception while creating consumer: " + e.getMessage(), e);
+ }
if (_messageListener != null)
{
@@ -2027,7 +2073,10 @@ public abstract class AMQSession<C exten
ex.initCause(e);
throw ex;
}
-
+ catch (TransportException e)
+ {
+ throw toJMSException("Exception while registering consumer:" + e.getMessage(), e);
+ }
return consumer;
}
}, _connection).execute();
@@ -2092,7 +2141,7 @@ public abstract class AMQSession<C exten
boolean isInRecovery()
{
- return _inRecovery;
+ return _sessionInRecovery;
}
boolean isQueueBound(AMQShortString exchangeName, AMQShortString queueName) throws JMSException
@@ -2214,7 +2263,7 @@ public abstract class AMQSession<C exten
void setInRecovery(boolean inRecovery)
{
- _inRecovery = inRecovery;
+ _sessionInRecovery = inRecovery;
}
boolean isStarted()
@@ -2395,7 +2444,7 @@ public abstract class AMQSession<C exten
/*
* I could have combined the last 3 methods, but this way it improves readability
*/
- protected AMQTopic checkValidTopic(Topic topic, boolean durable) throws JMSException
+ protected Topic checkValidTopic(Topic topic, boolean durable) throws JMSException
{
if (topic == null)
{
@@ -2414,17 +2463,17 @@ public abstract class AMQSession<C exten
("Cannot create a durable subscription with a temporary topic: " + topic);
}
- if (!(topic instanceof AMQTopic))
+ if (!(topic instanceof AMQDestination && topic instanceof javax.jms.Topic))
{
throw new javax.jms.InvalidDestinationException(
"Cannot create a subscription on topic created for another JMS Provider, class of topic provided is: "
+ topic.getClass().getName());
}
- return (AMQTopic) topic;
+ return topic;
}
- protected AMQTopic checkValidTopic(Topic topic) throws JMSException
+ protected Topic checkValidTopic(Topic topic) throws JMSException
{
return checkValidTopic(topic, false);
}
@@ -2553,15 +2602,9 @@ public abstract class AMQSession<C exten
public abstract void sendConsume(C consumer, AMQShortString queueName,
AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector, int tag) throws AMQException, FailoverException;
- private P createProducerImpl(Destination destination, boolean mandatory, boolean immediate)
+ private P createProducerImpl(final Destination destination, final boolean mandatory, final boolean immediate)
throws JMSException
{
- return createProducerImpl(destination, mandatory, immediate, DEFAULT_WAIT_ON_SEND);
- }
-
- private P createProducerImpl(final Destination destination, final boolean mandatory,
- final boolean immediate, final boolean waitUntilSent) throws JMSException
- {
return new FailoverRetrySupport<P, JMSException>(
new FailoverProtectedOperation<P, JMSException>()
{
@@ -2569,8 +2612,18 @@ public abstract class AMQSession<C exten
{
checkNotClosed();
long producerId = getNextProducerId();
- P producer = createMessageProducer(destination, mandatory,
- immediate, waitUntilSent, producerId);
+
+ P producer;
+ try
+ {
+ producer = createMessageProducer(destination, mandatory,
+ immediate, producerId);
+ }
+ catch (TransportException e)
+ {
+ throw toJMSException("Exception while creating producer:" + e.getMessage(), e);
+ }
+
registerProducer(producerId, producer);
return producer;
@@ -2579,7 +2632,7 @@ public abstract class AMQSession<C exten
}
public abstract P createMessageProducer(final Destination destination, final boolean mandatory,
- final boolean immediate, final boolean waitUntilSent, long producerId) throws JMSException;
+ final boolean immediate, final long producerId) throws JMSException;
private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException
{
@@ -2722,6 +2775,21 @@ public abstract class AMQSession<C exten
}
}
+ /**
+ * Undeclares the specified temporary queue/topic.
+ *
+ * <p/>Note that this operation automatically retries in the event of fail-over.
+ *
+ * @param amqQueue The name of the temporary destination to delete.
+ *
+ * @throws JMSException If the queue could not be deleted for any reason.
+ * @todo Be aware of possible changes to parameter order as versions change.
+ */
+ protected void deleteTemporaryDestination(final TemporaryDestination amqQueue) throws JMSException
+ {
+ deleteQueue(amqQueue.getAMQQueueName());
+ }
+
public abstract void sendQueueDelete(final AMQShortString queueName) throws AMQException, FailoverException;
private long getNextProducerId()
@@ -2819,6 +2887,7 @@ public abstract class AMQSession<C exten
{
declareQueue(amqd, protocolHandler, consumer.isNoLocal(), nowait);
}
+ bindQueue(amqd.getAMQQueueName(), amqd.getRoutingKey(), consumer.getArguments(), amqd.getExchangeName(), amqd, nowait);
}
AMQShortString queueName = amqd.getAMQQueueName();
@@ -2826,8 +2895,6 @@ public abstract class AMQSession<C exten
// store the consumer queue name
consumer.setQueuename(queueName);
- bindQueue(queueName, amqd.getRoutingKey(), consumer.getArguments(), amqd.getExchangeName(), amqd, nowait);
-
// If IMMEDIATE_PREFETCH is not required then suspsend the channel to delay prefetch
if (!_immediatePrefetch)
{
@@ -2978,6 +3045,10 @@ public abstract class AMQSession<C exten
{
throw new AMQException(null, "Fail-over interrupted suspend/unsuspend channel.", e);
}
+ catch (TransportException e)
+ {
+ throw new AMQException(AMQConstant.getConstant(getErrorCode(e)), e.getMessage(), e);
+ }
}
}
@@ -3016,21 +3087,11 @@ public abstract class AMQSession<C exten
*
* @return boolean true if failover has occured.
*/
- public boolean hasFailedOver()
+ public boolean hasFailedOverDirty()
{
return _failedOverDirty;
}
- /**
- * Check to see if any message have been sent in this transaction and have not been commited.
- *
- * @return boolean true if a message has been sent but not commited
- */
- public boolean isDirty()
- {
- return _dirty;
- }
-
public void setTicket(int ticket)
{
_ticket = ticket;
@@ -3143,7 +3204,7 @@ public abstract class AMQSession<C exten
setConnectionStopped(true);
}
- _rollbackMark.set(_highestDeliveryTag.get());
+ setRollbackMark();
_dispatcherLogger.debug("Session Pre Dispatch Queue cleared");
@@ -3292,9 +3353,14 @@ public abstract class AMQSession<C exten
if (!(message instanceof CloseConsumerMessage)
&& tagLE(deliveryTag, _rollbackMark.get()))
{
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Rejecting message because delivery tag " + deliveryTag
+ + " <= rollback mark " + _rollbackMark.get());
+ }
rejectMessage(message, true);
}
- else if (isInRecovery())
+ else if (_usingDispatcherForCleanup)
{
_unacknowledgedMessageTags.add(deliveryTag);
}
@@ -3353,6 +3419,11 @@ public abstract class AMQSession<C exten
// Don't reject if we're already closing
if (!_closed.get())
{
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Rejecting message with delivery tag " + message.getDeliveryTag()
+ + " for closing consumer " + String.valueOf(consumer == null? null: consumer._consumerTag));
+ }
rejectMessage(message, true);
}
}
@@ -3450,4 +3521,48 @@ public abstract class AMQSession<C exten
{
return _closing.get()|| _connection.isClosing();
}
+
+ public boolean isDeclareExchanges()
+ {
+ return DECLARE_EXCHANGES;
+ }
+
+ JMSException toJMSException(String message, TransportException e)
+ {
+ int code = getErrorCode(e);
+ JMSException jmse = new JMSException(message, Integer.toString(code));
+ jmse.setLinkedException(e);
+ jmse.initCause(e);
+ return jmse;
+ }
+
+ private int getErrorCode(TransportException e)
+ {
+ int code = AMQConstant.INTERNAL_ERROR.getCode();
+ if (e instanceof SessionException)
+ {
+ SessionException se = (SessionException) e;
+ if(se.getException() != null && se.getException().getErrorCode() != null)
+ {
+ code = se.getException().getErrorCode().getValue();
+ }
+ }
+ return code;
+ }
+
+ private boolean isBrowseOnlyDestination(Destination destination)
+ {
+ return ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly());
+ }
+
+ private void setRollbackMark()
+ {
+ // Let the dispatcher know that all the incomming messages
+ // should be rolled back(reject/release)
+ _rollbackMark.set(_highestDeliveryTag.get());
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Rollback mark is set to " + _rollbackMark.get());
+ }
+ }
}
Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Fri Oct 21 14:42:12 2011
@@ -47,6 +47,8 @@ import org.apache.qpid.client.message.AM
import org.apache.qpid.client.message.FieldTableSupport;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage_0_10;
+import org.apache.qpid.client.messaging.address.Link;
+import org.apache.qpid.client.messaging.address.Link.Reliability;
import org.apache.qpid.client.messaging.address.Node.ExchangeNode;
import org.apache.qpid.client.messaging.address.Node.QueueNode;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
@@ -56,6 +58,7 @@ import org.apache.qpid.framing.FieldTabl
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.transport.ExchangeBoundResult;
import org.apache.qpid.transport.ExchangeQueryResult;
+import org.apache.qpid.transport.ExecutionErrorCode;
import org.apache.qpid.transport.ExecutionException;
import org.apache.qpid.transport.MessageAcceptMode;
import org.apache.qpid.transport.MessageAcquireMode;
@@ -69,6 +72,7 @@ import org.apache.qpid.transport.RangeSe
import org.apache.qpid.transport.Session;
import org.apache.qpid.transport.SessionException;
import org.apache.qpid.transport.SessionListener;
+import org.apache.qpid.transport.TransportException;
import org.apache.qpid.util.Serial;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -156,13 +160,20 @@ public class AMQSession_0_10 extends AMQ
*/
AMQSession_0_10(org.apache.qpid.transport.Connection qpidConnection, AMQConnection con, int channelId,
boolean transacted, int acknowledgeMode, MessageFactoryRegistry messageFactoryRegistry,
- int defaultPrefetchHighMark, int defaultPrefetchLowMark)
+ int defaultPrefetchHighMark, int defaultPrefetchLowMark,String name)
{
super(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, defaultPrefetchHighMark,
defaultPrefetchLowMark);
_qpidConnection = qpidConnection;
- _qpidSession = _qpidConnection.createSession(1);
+ if (name == null)
+ {
+ _qpidSession = _qpidConnection.createSession(1);
+ }
+ else
+ {
+ _qpidSession = _qpidConnection.createSession(name,1);
+ }
_qpidSession.setSessionListener(this);
if (_transacted)
{
@@ -189,11 +200,12 @@ public class AMQSession_0_10 extends AMQ
* @param qpidConnection The connection
*/
AMQSession_0_10(org.apache.qpid.transport.Connection qpidConnection, AMQConnection con, int channelId,
- boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow)
+ boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow,
+ String name)
{
this(qpidConnection, con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(),
- defaultPrefetchHigh, defaultPrefetchLow);
+ defaultPrefetchHigh, defaultPrefetchLow,name);
}
private void addUnacked(int id)
@@ -258,7 +270,7 @@ public class AMQSession_0_10 extends AMQ
long prefetch = getAMQConnection().getMaxPrefetch();
- if (unackedCount >= prefetch/2 || maxAckDelay <= 0)
+ if (unackedCount >= prefetch/2 || maxAckDelay <= 0 || _acknowledgeMode == javax.jms.Session.AUTO_ACKNOWLEDGE)
{
flushAcknowledgments();
}
@@ -282,23 +294,34 @@ public class AMQSession_0_10 extends AMQ
}
}
- void messageAcknowledge(RangeSet ranges, boolean accept)
+ void messageAcknowledge(final RangeSet ranges, final boolean accept)
{
messageAcknowledge(ranges,accept,false);
}
- void messageAcknowledge(RangeSet ranges, boolean accept,boolean setSyncBit)
+ void messageAcknowledge(final RangeSet ranges, final boolean accept, final boolean setSyncBit)
{
- Session ssn = getQpidSession();
- for (Range range : ranges)
+ final Session ssn = getQpidSession();
+ flushProcessed(ranges,accept);
+ if (accept)
{
- ssn.processed(range);
+ ssn.messageAccept(ranges, UNRELIABLE, setSyncBit ? SYNC : NONE);
}
- ssn.flushProcessed(accept ? BATCH : NONE);
- if (accept)
+ }
+
+ /**
+ * Flush any outstanding commands. This causes session complete to be sent.
+ * @param ranges the range of command ids.
+ * @param batch true if batched.
+ */
+ void flushProcessed(final RangeSet ranges, final boolean batch)
+ {
+ final Session ssn = getQpidSession();
+ for (final Range range : ranges)
{
- ssn.messageAccept(ranges, UNRELIABLE,setSyncBit? SYNC : NONE);
+ ssn.processed(range);
}
+ ssn.flushProcessed(batch ? BATCH : NONE);
}
/**
@@ -314,7 +337,7 @@ public class AMQSession_0_10 extends AMQ
public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey,
final FieldTable arguments, final AMQShortString exchangeName,
final AMQDestination destination, final boolean nowait)
- throws AMQException, FailoverException
+ throws AMQException
{
if (destination.getDestSyntax() == DestSyntax.BURL)
{
@@ -400,25 +423,6 @@ public class AMQSession_0_10 extends AMQ
}
}
-
- /**
- * Commit the receipt and the delivery of all messages exchanged by this session resources.
- */
- public void sendCommit() throws AMQException, FailoverException
- {
- getQpidSession().setAutoSync(true);
- try
- {
- getQpidSession().txCommit();
- }
- finally
- {
- getQpidSession().setAutoSync(false);
- }
- // We need to sync so that we get notify of an error.
- sync();
- }
-
/**
* Create a queue with a given name.
*
@@ -451,6 +455,14 @@ public class AMQSession_0_10 extends AMQ
public void sendRecover() throws AMQException, FailoverException
{
// release all unacked messages
+ RangeSet ranges = gatherUnackedRangeSet();
+ getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
+ // We need to sync so that we get notify of an error.
+ sync();
+ }
+
+ private RangeSet gatherUnackedRangeSet()
+ {
RangeSet ranges = new RangeSet();
while (true)
{
@@ -459,11 +471,11 @@ public class AMQSession_0_10 extends AMQ
{
break;
}
- ranges.add((int) (long) tag);
+
+ ranges.add(tag.intValue());
}
- getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
- // We need to sync so that we get notify of an error.
- sync();
+
+ return ranges;
}
@@ -537,7 +549,6 @@ public class AMQSession_0_10 extends AMQ
}
public boolean isQueueBound(final String exchangeName, final String queueName, final String bindingKey,Map<String,Object> args)
- throws JMSException
{
boolean res;
ExchangeBoundResult bindingQueryResult =
@@ -600,10 +611,16 @@ public class AMQSession_0_10 extends AMQ
(Map<? extends String, ? extends Object>) consumer.getDestination().getLink().getSubscription().getArgs());
}
+ boolean acceptModeNone = getAcknowledgeMode() == NO_ACKNOWLEDGE;
+
+ if (consumer.getDestination().getLink() != null)
+ {
+ acceptModeNone = consumer.getDestination().getLink().getReliability() == Link.Reliability.UNRELIABLE;
+ }
getQpidSession().messageSubscribe
(queueName.toString(), String.valueOf(tag),
- getAcknowledgeMode() == NO_ACKNOWLEDGE ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT,
+ acceptModeNone ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT,
preAcquire ? MessageAcquireMode.PRE_ACQUIRED : MessageAcquireMode.NOT_ACQUIRED, null, 0, arguments,
consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
}
@@ -659,13 +676,12 @@ public class AMQSession_0_10 extends AMQ
* Create an 0_10 message producer
*/
public BasicMessageProducer_0_10 createMessageProducer(final Destination destination, final boolean mandatory,
- final boolean immediate, final boolean waitUntilSent,
- long producerId) throws JMSException
+ final boolean immediate, final long producerId) throws JMSException
{
try
{
return new BasicMessageProducer_0_10(_connection, (AMQDestination) destination, _transacted, _channelId, this,
- getProtocolHandler(), producerId, immediate, mandatory, waitUntilSent);
+ getProtocolHandler(), producerId, immediate, mandatory);
}
catch (AMQException e)
{
@@ -675,6 +691,10 @@ public class AMQSession_0_10 extends AMQ
throw ex;
}
+ catch(TransportException e)
+ {
+ throw toJMSException("Exception while creating message producer:" + e.getMessage(), e);
+ }
}
@@ -767,7 +787,7 @@ public class AMQSession_0_10 extends AMQ
else
{
QueueNode node = (QueueNode)amqd.getSourceNode();
- getQpidSession().queueDeclare(queueName.toString(), "" ,
+ getQpidSession().queueDeclare(queueName.toString(), node.getAlternateExchange() ,
node.getDeclareArgs(),
node.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE,
node.isDurable() ? Option.DURABLE : Option.NONE,
@@ -904,7 +924,26 @@ public class AMQSession_0_10 extends AMQ
setCurrentException(exc);
}
- public void closed(Session ssn) {}
+ public void closed(Session ssn)
+ {
+ try
+ {
+ super.closed(null);
+ if (flushTask != null)
+ {
+ flushTask.cancel();
+ flushTask = null;
+ }
+ } catch (Exception e)
+ {
+ _logger.error("Error closing JMS session", e);
+ }
+ }
+
+ public AMQException getLastException()
+ {
+ return getCurrentException();
+ }
protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
final boolean noLocal, final boolean nowait)
@@ -958,27 +997,26 @@ public class AMQSession_0_10 extends AMQ
}
}
- @Override public void commit() throws JMSException
+ public void commitImpl() throws AMQException, FailoverException, TransportException
{
- checkTransacted();
- try
+ if( _txSize > 0 )
{
- if( _txSize > 0 )
- {
- messageAcknowledge(_txRangeSet, true);
- _txRangeSet.clear();
- _txSize = 0;
- }
- sendCommit();
+ messageAcknowledge(_txRangeSet, true);
+ _txRangeSet.clear();
+ _txSize = 0;
}
- catch (AMQException e)
+
+ getQpidSession().setAutoSync(true);
+ try
{
- throw new JMSAMQException("Failed to commit: " + e.getMessage(), e);
+ getQpidSession().txCommit();
}
- catch (FailoverException e)
+ finally
{
- throw new JMSAMQException("Fail-over interrupted commit. Status of the commit is uncertain.", e);
+ getQpidSession().setAutoSync(false);
}
+ // We need to sync so that we get notify of an error.
+ sync();
}
protected final boolean tagLE(long tag1, long tag2)
@@ -1020,11 +1058,9 @@ public class AMQSession_0_10 extends AMQ
code = ee.getErrorCode().getValue();
}
AMQException amqe = new AMQException(AMQConstant.getConstant(code), se.getMessage(), se.getCause());
-
- _connection.exceptionReceived(amqe);
-
_currentException = amqe;
}
+ _connection.exceptionReceived(_currentException);
}
public AMQMessageDelegateFactory getMessageDelegateFactory()
@@ -1068,22 +1104,37 @@ public class AMQSession_0_10 extends AMQ
return match;
}
- public boolean isQueueExist(AMQDestination dest,QueueNode node,boolean assertNode)
+ public boolean isQueueExist(AMQDestination dest,QueueNode node,boolean assertNode) throws AMQException
{
boolean match = true;
- QueueQueryResult result = getQpidSession().queueQuery(dest.getAddressName(), Option.NONE).get();
- match = dest.getAddressName().equals(result.getQueue());
-
- if (match && assertNode)
+ try
{
- match = (result.getDurable() == node.isDurable()) &&
- (result.getAutoDelete() == node.isAutoDelete()) &&
- (result.getExclusive() == node.isExclusive()) &&
- (matchProps(result.getArguments(),node.getDeclareArgs()));
+ QueueQueryResult result = getQpidSession().queueQuery(dest.getAddressName(), Option.NONE).get();
+ match = dest.getAddressName().equals(result.getQueue());
+
+ if (match && assertNode)
+ {
+ match = (result.getDurable() == node.isDurable()) &&
+ (result.getAutoDelete() == node.isAutoDelete()) &&
+ (result.getExclusive() == node.isExclusive()) &&
+ (matchProps(result.getArguments(),node.getDeclareArgs()));
+ }
+ else if (match)
+ {
+ // should I use the queried details to update the local data structure.
+ }
}
- else if (match)
+ catch(SessionException e)
{
- // should I use the queried details to update the local data structure.
+ if (e.getException().getErrorCode() == ExecutionErrorCode.RESOURCE_DELETED)
+ {
+ match = false;
+ }
+ else
+ {
+ throw new AMQException(AMQConstant.getConstant(e.getException().getErrorCode().getValue()),
+ "Error querying queue",e);
+ }
}
return match;
@@ -1128,8 +1179,8 @@ public class AMQSession_0_10 extends AMQ
boolean isConsumer,
boolean noWait) throws AMQException
{
- if (dest.isAddressResolved())
- {
+ if (dest.isAddressResolved() && dest.isResolvedAfter(_connection.getLastFailoverTime()))
+ {
if (isConsumer && AMQDestination.TOPIC_TYPE == dest.getAddressType())
{
createSubscriptionQueue(dest);
@@ -1149,6 +1200,22 @@ public class AMQSession_0_10 extends AMQ
int type = resolveAddressType(dest);
+ if (type == AMQDestination.QUEUE_TYPE &&
+ dest.getLink().getReliability() == Reliability.UNSPECIFIED)
+ {
+ dest.getLink().setReliability(Reliability.AT_LEAST_ONCE);
+ }
+ else if (type == AMQDestination.TOPIC_TYPE &&
+ dest.getLink().getReliability() == Reliability.UNSPECIFIED)
+ {
+ dest.getLink().setReliability(Reliability.UNRELIABLE);
+ }
+ else if (type == AMQDestination.TOPIC_TYPE &&
+ dest.getLink().getReliability() == Reliability.AT_LEAST_ONCE)
+ {
+ throw new AMQException("AT-LEAST-ONCE is not yet supported for Topics");
+ }
+
switch (type)
{
case AMQDestination.QUEUE_TYPE:
@@ -1162,6 +1229,8 @@ public class AMQSession_0_10 extends AMQ
{
setLegacyFiledsForQueueType(dest);
send0_10QueueDeclare(dest,null,false,noWait);
+ sendQueueBind(dest.getAMQQueueName(), dest.getRoutingKey(),
+ null,dest.getExchangeName(),dest, false);
break;
}
}
@@ -1200,7 +1269,7 @@ public class AMQSession_0_10 extends AMQ
"The name '" + dest.getAddressName() +
"' supplied in the address doesn't resolve to an exchange or a queue");
}
- dest.setAddressResolved(true);
+ dest.setAddressResolved(System.currentTimeMillis());
}
}
@@ -1270,6 +1339,8 @@ public class AMQSession_0_10 extends AMQ
dest.getQueueName(),// should have one by now
dest.getSubject(),
Collections.<String,Object>emptyMap()));
+ sendQueueBind(dest.getAMQQueueName(), dest.getRoutingKey(),
+ null,dest.getExchangeName(),dest, false);
}
public void setLegacyFiledsForQueueType(AMQDestination dest)
@@ -1307,5 +1378,26 @@ public class AMQSession_0_10 extends AMQ
sb.append(">");
return sb.toString();
}
-
+
+ protected void acknowledgeImpl()
+ {
+ RangeSet range = gatherUnackedRangeSet();
+
+ if(range.size() > 0 )
+ {
+ messageAcknowledge(range, true);
+ getQpidSession().sync();
+ }
+ }
+
+ @Override
+ void resubscribe() throws AMQException
+ {
+ // Also reset the delivery tag tracker, to insure we dont
+ // return the first <total number of msgs received on session>
+ // messages sent by the brokers following the first rollback
+ // after failover
+ _highestDeliveryTag.set(-1);
+ super.resubscribe();
+ }
}
Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Fri Oct 21 14:42:12 2011
@@ -38,6 +38,7 @@ import org.apache.qpid.client.message.Re
import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.state.AMQState;
+import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.AMQFrame;
@@ -75,12 +76,12 @@ import org.apache.qpid.framing.amqp_0_91
import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.transport.TransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8>
{
-
/** Used for debugging. */
private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
@@ -90,7 +91,7 @@ public final class AMQSession_0_8 extend
* @param con The connection on which to create the session.
* @param channelId The unique identifier for the session.
* @param transacted Indicates whether or not the session is transactional.
- * @param acknowledgeMode The acknoledgement mode for the session.
+ * @param acknowledgeMode The acknowledgement mode for the session.
* @param messageFactoryRegistry The message factory factory for the session.
* @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session.
* @param defaultPrefetchLowMark The number of prefetched messages at which to resume the session.
@@ -108,7 +109,7 @@ public final class AMQSession_0_8 extend
* @param con The connection on which to create the session.
* @param channelId The unique identifier for the session.
* @param transacted Indicates whether or not the session is transactional.
- * @param acknowledgeMode The acknoledgement mode for the session.
+ * @param acknowledgeMode The acknowledgement mode for the session.
* @param defaultPrefetchHigh The maximum number of messages to prefetched before suspending the session.
* @param defaultPrefetchLow The number of prefetched messages at which to resume the session.
*/
@@ -124,6 +125,20 @@ public final class AMQSession_0_8 extend
return getProtocolHandler().getProtocolVersion();
}
+ protected void acknowledgeImpl()
+ {
+ while (true)
+ {
+ Long tag = _unacknowledgedMessageTags.poll();
+ if (tag == null)
+ {
+ break;
+ }
+
+ acknowledgeMessage(tag, false);
+ }
+ }
+
public void acknowledgeMessage(long deliveryTag, boolean multiple)
{
BasicAckBody body = getMethodRegistry().createBasicAckBody(deliveryTag, multiple);
@@ -153,7 +168,7 @@ public final class AMQSession_0_8 extend
// we also need to check the state manager for 08/09 as the
// _connection variable may not be updated in time by the error receiving
// thread.
- // We can't close the session if we are alreadying in the process of
+ // We can't close the session if we are already in the process of
// closing/closed the connection.
if (!(getProtocolHandler().getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED)
@@ -169,8 +184,20 @@ public final class AMQSession_0_8 extend
}
}
- public void sendCommit() throws AMQException, FailoverException
+ public void commitImpl() throws AMQException, FailoverException, TransportException
{
+ // Acknowledge all delivered messages
+ while (true)
+ {
+ Long tag = _deliveredMessageTags.poll();
+ if (tag == null)
+ {
+ break;
+ }
+
+ acknowledgeMessage(tag, false);
+ }
+
final AMQProtocolHandler handler = getProtocolHandler();
handler.syncWrite(getProtocolHandler().getMethodRegistry().createTxCommitBody().generateFrame(_channelId), TxCommitOkBody.class);
@@ -400,12 +427,12 @@ public final class AMQSession_0_8 extend
public BasicMessageProducer_0_8 createMessageProducer(final Destination destination, final boolean mandatory,
- final boolean immediate, final boolean waitUntilSent, long producerId) throws JMSException
+ final boolean immediate, long producerId) throws JMSException
{
try
{
return new BasicMessageProducer_0_8(_connection, (AMQDestination) destination, _transacted, _channelId,
- this, getProtocolHandler(), producerId, immediate, mandatory, waitUntilSent);
+ this, getProtocolHandler(), producerId, immediate, mandatory);
}
catch (AMQException e)
{
@@ -577,6 +604,18 @@ public final class AMQSession_0_8 extend
}
+ @Override
+ protected void deleteTemporaryDestination(final TemporaryDestination amqQueue)
+ throws JMSException
+ {
+ // Currently TemporaryDestination is set to be auto-delete which, for 0-8..0-9-1, means that the queue will be deleted
+ // by the server when there are no more subscriptions to that queue/topic (rather than when the client disconnects).
+ // This is not quite right for JMSCompliance as the queue/topic should remain until the connection closes, or the
+ // client explicitly deletes it.
+
+ /* intentional no-op */
+ }
+
public boolean isQueueBound(String exchangeName, String queueName,
String bindingKey, Map<String, Object> args) throws JMSException
{
@@ -584,4 +623,34 @@ public final class AMQSession_0_8 extend
queueName == null ? null : new AMQShortString(queueName),
bindingKey == null ? null : new AMQShortString(bindingKey));
}
+
+
+ public AMQException getLastException()
+ {
+ // if the Connection has closed then we should throw any exception that
+ // has occurred that we were not waiting for
+ AMQStateManager manager = _connection.getProtocolHandler()
+ .getStateManager();
+
+ Exception e = manager.getLastException();
+ if (manager.getCurrentState().equals(AMQState.CONNECTION_CLOSED)
+ && e != null)
+ {
+ if (e instanceof AMQException)
+ {
+ return (AMQException) e;
+ }
+ else
+ {
+ AMQException amqe = new AMQException(AMQConstant
+ .getConstant(AMQConstant.INTERNAL_ERROR.getCode()),
+ e.getMessage(), e.getCause());
+ return amqe;
+ }
+ }
+ else
+ {
+ return null;
+ }
+ }
}
Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java Fri Oct 21 14:42:12 2011
@@ -20,14 +20,13 @@
*/
package org.apache.qpid.client;
+import java.util.UUID;
+
import javax.jms.JMSException;
import javax.jms.TemporaryQueue;
import org.apache.qpid.framing.AMQShortString;
-import java.util.Random;
-import java.util.UUID;
-
/** AMQ implementation of a TemporaryQueue. */
final class AMQTemporaryQueue extends AMQQueue implements TemporaryQueue, TemporaryDestination
{
@@ -50,11 +49,15 @@ final class AMQTemporaryQueue extends AM
{
throw new JMSException("Temporary Queue has consumers so cannot be deleted");
}
- _deleted = true;
- // Currently TemporaryQueue is set to be auto-delete which means that the queue will be deleted
- // by the server when there are no more subscriptions to that queue. This is probably not
- // quite right for JMSCompliance.
+ try
+ {
+ _session.deleteTemporaryDestination(this);
+ }
+ finally
+ {
+ _deleted = true;
+ }
}
public AMQSession getSession()
Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java Fri Oct 21 14:42:12 2011
@@ -53,10 +53,14 @@ class AMQTemporaryTopic extends AMQTopic
throw new JMSException("Temporary Topic has consumers so cannot be deleted");
}
- _deleted = true;
- // Currently TemporaryQueue is set to be auto-delete which means that the queue will be deleted
- // by the server when there are no more subscriptions to that queue. This is probably not
- // quite right for JMSCompliance.
+ try
+ {
+ _session.deleteTemporaryDestination(this);
+ }
+ finally
+ {
+ _deleted = true;
+ }
}
public AMQSession getSession()
Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java Fri Oct 21 14:42:12 2011
@@ -22,6 +22,7 @@ package org.apache.qpid.client;
import java.net.URISyntaxException;
+import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import javax.jms.Topic;
@@ -95,39 +96,47 @@ public class AMQTopic extends AMQDestina
super(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, isDurable,bindingKeys);
}
- public static AMQTopic createDurableTopic(AMQTopic topic, String subscriptionName, AMQConnection connection)
+ public static AMQTopic createDurableTopic(Topic topic, String subscriptionName, AMQConnection connection)
throws JMSException
{
- if (topic.getDestSyntax() == DestSyntax.ADDR)
+ if (topic instanceof AMQDestination && topic instanceof javax.jms.Topic)
{
- try
+ AMQDestination qpidTopic = (AMQDestination)topic;
+ if (qpidTopic.getDestSyntax() == DestSyntax.ADDR)
{
- AMQTopic t = new AMQTopic(topic.getAddress());
- AMQShortString queueName = getDurableTopicQueueName(subscriptionName, connection);
- // link is never null if dest was created using an address string.
- t.getLink().setName(queueName.asString());
- t.getSourceNode().setAutoDelete(false);
- t.getSourceNode().setDurable(true);
-
- // The legacy fields are also populated just in case.
- t.setQueueName(queueName);
- t.setAutoDelete(false);
- t.setDurable(true);
- return t;
+ try
+ {
+ AMQTopic t = new AMQTopic(qpidTopic.getAddress());
+ AMQShortString queueName = getDurableTopicQueueName(subscriptionName, connection);
+ // link is never null if dest was created using an address string.
+ t.getLink().setName(queueName.asString());
+ t.getSourceNode().setAutoDelete(false);
+ t.getSourceNode().setDurable(true);
+
+ // The legacy fields are also populated just in case.
+ t.setQueueName(queueName);
+ t.setAutoDelete(false);
+ t.setDurable(true);
+ return t;
+ }
+ catch(Exception e)
+ {
+ JMSException ex = new JMSException("Error creating durable topic");
+ ex.initCause(e);
+ ex.setLinkedException(e);
+ throw ex;
+ }
}
- catch(Exception e)
+ else
{
- JMSException ex = new JMSException("Error creating durable topic");
- ex.initCause(e);
- ex.setLinkedException(e);
- throw ex;
+ return new AMQTopic(qpidTopic.getExchangeName(), qpidTopic.getRoutingKey(), false,
+ getDurableTopicQueueName(subscriptionName, connection),
+ true);
}
}
else
{
- return new AMQTopic(topic.getExchangeName(), topic.getRoutingKey(), false,
- getDurableTopicQueueName(subscriptionName, connection),
- true);
+ throw new InvalidDestinationException("The destination object used is not from this provider or of type javax.jms.Topic");
}
}
@@ -138,13 +147,17 @@ public class AMQTopic extends AMQDestina
public String getTopicName() throws JMSException
{
- if (super.getRoutingKey() == null && super.getSubject() != null)
+ if (getRoutingKey() != null)
{
- return super.getSubject();
+ return getRoutingKey().asString();
+ }
+ else if (getSubject() != null)
+ {
+ return getSubject();
}
else
{
- return super.getRoutingKey().toString();
+ return null;
}
}
@@ -163,12 +176,18 @@ public class AMQTopic extends AMQDestina
public AMQShortString getRoutingKey()
{
- if (super.getRoutingKey() == null && super.getSubject() != null)
+ if (super.getRoutingKey() != null)
+ {
+ return super.getRoutingKey();
+ }
+ else if (getSubject() != null)
{
- return new AMQShortString(super.getSubject());
+ return new AMQShortString(getSubject());
}
else
{
+ setRoutingKey(new AMQShortString(""));
+ setSubject("");
return super.getRoutingKey();
}
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org