You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/21 03:20:13 UTC
svn commit: r1187150 [29/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/
cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/
cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/
cpp/bindings/qmf2/python/ cpp/bindings/qmf...
Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Fri Oct 21 01:19:00 2011
@@ -70,6 +70,7 @@ import org.apache.qpid.AMQDisconnectedEx
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInvalidArgumentException;
import org.apache.qpid.AMQInvalidRoutingKeyException;
+import org.apache.qpid.client.AMQDestination.AddressOption;
import org.apache.qpid.client.AMQDestination.DestSyntax;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverNoopSupport;
@@ -87,6 +88,8 @@ import org.apache.qpid.client.message.JM
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.state.AMQState;
+import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.AMQShortString;
@@ -94,10 +97,7 @@ import org.apache.qpid.framing.FieldTabl
import org.apache.qpid.framing.FieldTableFactory;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.jms.Session;
-import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.thread.Threading;
-import org.apache.qpid.transport.SessionException;
-import org.apache.qpid.transport.TransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -213,6 +213,8 @@ public abstract class AMQSession<C exten
*/
protected final boolean DEFAULT_MANDATORY = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true"));
+ protected final boolean DEFAULT_WAIT_ON_SEND = Boolean.parseBoolean(System.getProperty("qpid.default_wait_on_send", "false"));
+
/**
* The period to wait while flow controlled before sending a log message confirming that the session is still
* waiting on flow control being revoked
@@ -308,7 +310,7 @@ public abstract class AMQSession<C exten
protected final FlowControllingBlockingQueue _queue;
/** Holds the highest received delivery tag. */
- protected final AtomicLong _highestDeliveryTag = new AtomicLong(-1);
+ private final AtomicLong _highestDeliveryTag = new AtomicLong(-1);
private final AtomicLong _rollbackMark = new AtomicLong(-1);
/** All the not yet acknowledged message tags */
@@ -362,13 +364,7 @@ public abstract class AMQSession<C exten
* Set when recover is called. This is to handle the case where recover() is called by application code during
* onMessage() processing to ensure that an auto ack is not sent.
*/
- private volatile boolean _sessionInRecovery;
-
- /**
- * Set when the dispatcher should direct incoming messages straight into the UnackedMessage list instead of
- * to the syncRecieveQueue or MessageListener. Used during cleanup, e.g. in Session.recover().
- */
- private volatile boolean _usingDispatcherForCleanup;
+ private boolean _inRecovery;
/** Used to indicates that the connection to which this session belongs, has been stopped. */
private boolean _connectionStopped;
@@ -571,8 +567,6 @@ public abstract class AMQSession<C exten
close(-1);
}
- public abstract AMQException getLastException();
-
public void checkNotClosed() throws JMSException
{
try
@@ -581,20 +575,16 @@ public abstract class AMQSession<C exten
}
catch (IllegalStateException ise)
{
- AMQException ex = getLastException();
- if (ex != null)
- {
- IllegalStateException ssnClosed = new IllegalStateException(
- "Session has been closed", ex.getErrorCode().toString());
+ // if the Connection has closed then we should throw any exception that has occurred that we were not waiting for
+ AMQStateManager manager = _connection.getProtocolHandler().getStateManager();
- ssnClosed.setLinkedException(ex);
- ssnClosed.initCause(ex);
- throw ssnClosed;
- }
- else
+ if (manager.getCurrentState().equals(AMQState.CONNECTION_CLOSED) && manager.getLastException() != null)
{
- throw ise;
+ ise.setLinkedException(manager.getLastException());
+ ise.initCause(ise.getLinkedException());
}
+
+ throw ise;
}
}
@@ -610,36 +600,29 @@ public abstract class AMQSession<C exten
* Acknowledges all unacknowledged messages on the session, for all message consumers on the session.
*
* @throws IllegalStateException If the session is closed.
- * @throws JMSException if there is a problem during acknowledge process.
*/
- public void acknowledge() throws IllegalStateException, JMSException
+ public void acknowledge() throws IllegalStateException
{
if (isClosed())
{
throw new IllegalStateException("Session is already closed");
}
- else if (hasFailedOverDirty())
+ else if (hasFailedOver())
{
- //perform an implicit recover in this scenario
- recover();
-
- //notify the consumer
throw new IllegalStateException("has failed over");
}
- try
- {
- acknowledgeImpl();
- markClean();
- }
- catch (TransportException e)
+ while (true)
{
- throw toJMSException("Exception while acknowledging message(s):" + e.getMessage(), e);
+ Long tag = _unacknowledgedMessageTags.poll();
+ if (tag == null)
+ {
+ break;
+ }
+ acknowledgeMessage(tag, false);
}
}
- protected abstract void acknowledgeImpl() throws JMSException;
-
/**
* Acknowledge one or many messages.
*
@@ -774,10 +757,6 @@ public abstract class AMQSession<C exten
_logger.debug(
"Got FailoverException during channel close, ignored as channel already marked as closed.");
}
- catch (TransportException e)
- {
- throw toJMSException("Error closing session:" + e.getMessage(), e);
- }
finally
{
_connection.deregisterSession(_channelId);
@@ -848,44 +827,51 @@ public abstract class AMQSession<C exten
* @throws JMSException If the JMS provider fails to commit the transaction due to some internal error. This does
* not mean that the commit is known to have failed, merely that it is not known whether it
* failed or not.
+ * @todo Be aware of possible changes to parameter order as versions change.
*/
public void commit() throws JMSException
{
checkTransacted();
- //Check that we are clean to commit.
- if (_failedOverDirty)
+ try
{
- if (_logger.isDebugEnabled())
+ //Check that we are clean to commit.
+ if (_failedOverDirty)
{
- _logger.debug("Session " + _channelId + " was dirty whilst failing over. Rolling back.");
+ rollback();
+
+ throw new TransactionRolledBackException("Connection failover has occured since last send. " +
+ "Forced rollback");
}
- rollback();
- throw new TransactionRolledBackException("Connection failover has occured with uncommitted transaction activity." +
- "The session transaction was rolled back.");
- }
- try
- {
- commitImpl();
+ // Acknowledge all delivered messages
+ while (true)
+ {
+ Long tag = _deliveredMessageTags.poll();
+ if (tag == null)
+ {
+ break;
+ }
+
+ acknowledgeMessage(tag, false);
+ }
+ // Commits outstanding messages and acknowledgments
+ sendCommit();
markClean();
}
catch (AMQException e)
{
- throw new JMSAMQException("Exception during commit: " + e.getMessage() + ":" + e.getCause(), e);
+ throw new JMSAMQException("Failed to commit: " + e.getMessage() + ":" + e.getCause(), e);
}
catch (FailoverException e)
{
throw new JMSAMQException("Fail-over interrupted commit. Status of the commit is uncertain.", e);
}
- catch(TransportException e)
- {
- throw toJMSException("Session exception occured while trying to commit: " + e.getMessage(), e);
- }
}
- protected abstract void commitImpl() throws AMQException, FailoverException, TransportException;
+ public abstract void sendCommit() throws AMQException, FailoverException;
+
public void confirmConsumerCancelled(int consumerTag)
{
@@ -963,7 +949,7 @@ public abstract class AMQSession<C exten
return new AMQQueueBrowser(this, (AMQQueue) queue, messageSelector);
}
- protected MessageConsumer createBrowserConsumer(Destination destination, String messageSelector, boolean noLocal)
+ public MessageConsumer createBrowserConsumer(Destination destination, String messageSelector, boolean noLocal)
throws JMSException
{
checkValidDestination(destination);
@@ -977,7 +963,15 @@ public abstract class AMQSession<C exten
checkValidDestination(destination);
return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, (destination instanceof Topic), null, null,
- isBrowseOnlyDestination(destination), false);
+ ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false);
+ }
+
+ public C createExclusiveConsumer(Destination destination) throws JMSException
+ {
+ checkValidDestination(destination);
+
+ return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, true, null, null,
+ ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false);
}
public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
@@ -985,7 +979,7 @@ public abstract class AMQSession<C exten
checkValidDestination(destination);
return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, (destination instanceof Topic),
- messageSelector, null, isBrowseOnlyDestination(destination), false);
+ messageSelector, null, ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false);
}
public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal)
@@ -994,7 +988,16 @@ public abstract class AMQSession<C exten
checkValidDestination(destination);
return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, (destination instanceof Topic),
- messageSelector, null, isBrowseOnlyDestination(destination), false);
+ messageSelector, null, ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false);
+ }
+
+ public MessageConsumer createExclusiveConsumer(Destination destination, String messageSelector, boolean noLocal)
+ throws JMSException
+ {
+ checkValidDestination(destination);
+
+ return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, true,
+ messageSelector, null, false, false);
}
public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive,
@@ -1002,15 +1005,23 @@ public abstract class AMQSession<C exten
{
checkValidDestination(destination);
- return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, null, isBrowseOnlyDestination(destination), false);
+ return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, null, ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false);
}
public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal,
- boolean exclusive, String selector) throws JMSException
+ boolean exclusive, String selector) throws JMSException
{
checkValidDestination(destination);
- return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null, isBrowseOnlyDestination(destination), false);
+ return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null, ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false);
+ }
+
+ public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive,
+ String selector, FieldTable rawSelector) throws JMSException
+ {
+ checkValidDestination(destination);
+
+ return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, rawSelector, ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false);
}
public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal,
@@ -1018,7 +1029,7 @@ public abstract class AMQSession<C exten
{
checkValidDestination(destination);
- return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, rawSelector, isBrowseOnlyDestination(destination),
+ return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, rawSelector, ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()),
false);
}
@@ -1032,33 +1043,8 @@ public abstract class AMQSession<C exten
throws JMSException
{
checkNotClosed();
- Topic origTopic = checkValidTopic(topic, true);
-
+ AMQTopic origTopic = checkValidTopic(topic, true);
AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection);
- if (dest.getDestSyntax() == DestSyntax.ADDR &&
- !dest.isAddressResolved())
- {
- try
- {
- handleAddressBasedDestination(dest,false,true);
- if (dest.getAddressType() != AMQDestination.TOPIC_TYPE)
- {
- throw new JMSException("Durable subscribers can only be created for Topics");
- }
- dest.getSourceNode().setDurable(true);
- }
- catch(AMQException e)
- {
- JMSException ex = new JMSException("Error when verifying destination");
- ex.initCause(e);
- ex.setLinkedException(e);
- throw ex;
- }
- catch(TransportException e)
- {
- throw toJMSException("Error when verifying destination", e);
- }
- }
String messageSelector = ((selector == null) || (selector.trim().length() == 0)) ? null : selector;
@@ -1070,9 +1056,15 @@ public abstract class AMQSession<C exten
// Not subscribed to this name in the current session
if (subscriber == null)
{
- // After the address is resolved routing key will not be null.
- AMQShortString topicName = dest.getRoutingKey();
-
+ AMQShortString topicName;
+ if (topic instanceof AMQTopic)
+ {
+ topicName = ((AMQTopic) topic).getRoutingKey();
+ } else
+ {
+ topicName = new AMQShortString(topic.getTopicName());
+ }
+
if (_strictAMQP)
{
if (_strictAMQPFATAL)
@@ -1143,10 +1135,6 @@ public abstract class AMQSession<C exten
return subscriber;
}
- catch (TransportException e)
- {
- throw toJMSException("Exception while creating durable subscriber:" + e.getMessage(), e);
- }
finally
{
_subscriberDetails.unlock();
@@ -1207,6 +1195,12 @@ public abstract class AMQSession<C exten
return createProducerImpl(destination, mandatory, immediate);
}
+ public P createProducer(Destination destination, boolean mandatory, boolean immediate,
+ boolean waitUntilSent) throws JMSException
+ {
+ return createProducerImpl(destination, mandatory, immediate, waitUntilSent);
+ }
+
public TopicPublisher createPublisher(Topic topic) throws JMSException
{
checkNotClosed();
@@ -1231,6 +1225,7 @@ public abstract class AMQSession<C exten
else
{
AMQQueue queue = new AMQQueue(queueName);
+ queue.setCreate(AddressOption.ALWAYS);
return queue;
}
@@ -1312,8 +1307,8 @@ public abstract class AMQSession<C exten
public QueueReceiver createQueueReceiver(Destination destination) throws JMSException
{
checkValidDestination(destination);
- Queue dest = validateQueue(destination);
- C consumer = (C) createConsumer(dest);
+ AMQQueue dest = (AMQQueue) destination;
+ C consumer = (C) createConsumer(destination);
return new QueueReceiverAdaptor(dest, consumer);
}
@@ -1331,8 +1326,8 @@ public abstract class AMQSession<C exten
public QueueReceiver createQueueReceiver(Destination destination, String messageSelector) throws JMSException
{
checkValidDestination(destination);
- Queue dest = validateQueue(destination);
- C consumer = (C) createConsumer(dest, messageSelector);
+ AMQQueue dest = (AMQQueue) destination;
+ C consumer = (C) createConsumer(destination, messageSelector);
return new QueueReceiverAdaptor(dest, consumer);
}
@@ -1349,7 +1344,7 @@ public abstract class AMQSession<C exten
public QueueReceiver createReceiver(Queue queue) throws JMSException
{
checkNotClosed();
- Queue dest = validateQueue(queue);
+ AMQQueue dest = (AMQQueue) queue;
C consumer = (C) createConsumer(dest);
return new QueueReceiverAdaptor(dest, consumer);
@@ -1368,28 +1363,17 @@ public abstract class AMQSession<C exten
public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException
{
checkNotClosed();
- Queue dest = validateQueue(queue);
+ AMQQueue dest = (AMQQueue) queue;
C consumer = (C) createConsumer(dest, messageSelector);
return new QueueReceiverAdaptor(dest, consumer);
}
-
- private Queue validateQueue(Destination dest) throws InvalidDestinationException
- {
- if (dest instanceof AMQDestination && dest instanceof javax.jms.Queue)
- {
- return (Queue)dest;
- }
- else
- {
- throw new InvalidDestinationException("The destination object used is not from this provider or of type javax.jms.Queue");
- }
- }
public QueueSender createSender(Queue queue) throws JMSException
{
checkNotClosed();
+ // return (QueueSender) createProducer(queue);
return new QueueSenderAdapter(createProducer(queue), queue);
}
@@ -1424,10 +1408,10 @@ public abstract class AMQSession<C exten
public TopicSubscriber createSubscriber(Topic topic) throws JMSException
{
checkNotClosed();
- checkValidTopic(topic);
+ AMQTopic dest = checkValidTopic(topic);
- return new TopicSubscriberAdaptor<C>(topic,
- createConsumerImpl(topic, _prefetchHighMark, _prefetchLowMark, false, true, null, null, false, false));
+ // AMQTopic dest = new AMQTopic(topic.getTopicName());
+ return new TopicSubscriberAdaptor(dest, (C) createExclusiveConsumer(dest));
}
/**
@@ -1444,11 +1428,10 @@ public abstract class AMQSession<C exten
public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException
{
checkNotClosed();
- checkValidTopic(topic);
+ AMQTopic dest = checkValidTopic(topic);
- return new TopicSubscriberAdaptor<C>(topic,
- createConsumerImpl(topic, _prefetchHighMark, _prefetchLowMark, noLocal,
- true, messageSelector, null, false, false));
+ // AMQTopic dest = new AMQTopic(topic.getTopicName());
+ return new TopicSubscriberAdaptor(dest, (C) createExclusiveConsumer(dest, messageSelector, noLocal));
}
public TemporaryQueue createTemporaryQueue() throws JMSException
@@ -1550,8 +1533,10 @@ public abstract class AMQSession<C exten
abstract public void sync() throws AMQException;
- public int getAcknowledgeMode()
+ public int getAcknowledgeMode() throws JMSException
{
+ checkNotClosed();
+
return _acknowledgeMode;
}
@@ -1611,8 +1596,10 @@ public abstract class AMQSession<C exten
return _ticket;
}
- public boolean getTransacted()
+ public boolean getTransacted() throws JMSException
{
+ checkNotClosed();
+
return _transacted;
}
@@ -1708,14 +1695,13 @@ public abstract class AMQSession<C exten
// Ensure that the session is not transacted.
checkNotTransacted();
-
+ // flush any acks we are holding in the buffer.
+ flushAcknowledgments();
+
+ // this is set only here, and the before the consumer's onMessage is called it is set to false
+ _inRecovery = true;
try
{
- // flush any acks we are holding in the buffer.
- flushAcknowledgments();
-
- // this is only set true here, and only set false when the consumers preDeliver method is called
- _sessionInRecovery = true;
boolean isSuspended = isSuspended();
@@ -1723,18 +1709,9 @@ public abstract class AMQSession<C exten
{
suspendChannel(true);
}
-
- // Set to true to short circuit delivery of anything currently
- //in the pre-dispatch queue.
- _usingDispatcherForCleanup = true;
-
+
syncDispatchQueue();
-
- // Set to false before sending the recover as 0-8/9/9-1 will
- //send messages back before the recover completes, and we
- //probably shouldn't clean those! ;-)
- _usingDispatcherForCleanup = false;
-
+
if (_dispatcher != null)
{
_dispatcher.recover();
@@ -1743,7 +1720,10 @@ public abstract class AMQSession<C exten
sendRecover();
markClean();
-
+
+ // Set inRecovery to false before you start message flow again again.
+ _inRecovery = false;
+
if (!isSuspended)
{
suspendChannel(false);
@@ -1757,10 +1737,7 @@ public abstract class AMQSession<C exten
{
throw new JMSAMQException("Recovery was interrupted by fail-over. Recovery status is not known.", e);
}
- catch(TransportException e)
- {
- throw toJMSException("Recover failed: " + e.getMessage(), e);
- }
+
}
protected abstract void sendRecover() throws AMQException, FailoverException;
@@ -1818,7 +1795,9 @@ public abstract class AMQSession<C exten
suspendChannel(true);
}
- setRollbackMark();
+ // Let the dispatcher know that all the incomming messages
+ // should be rolled back(reject/release)
+ _rollbackMark.set(_highestDeliveryTag.get());
syncDispatchQueue();
@@ -1843,10 +1822,6 @@ public abstract class AMQSession<C exten
{
throw new JMSAMQException("Fail-over interrupted rollback. Status of the rollback is uncertain.", e);
}
- catch (TransportException e)
- {
- throw toJMSException("Failure to rollback:" + e.getMessage(), e);
- }
}
}
@@ -1893,14 +1868,7 @@ public abstract class AMQSession<C exten
*/
public void unsubscribe(String name) throws JMSException
{
- try
- {
- unsubscribe(name, false);
- }
- catch (TransportException e)
- {
- throw toJMSException("Exception while unsubscribing:" + e.getMessage(), e);
- }
+ unsubscribe(name, false);
}
/**
@@ -1977,12 +1945,6 @@ public abstract class AMQSession<C exten
{
checkTemporaryDestination(destination);
- if(!noConsume && isBrowseOnlyDestination(destination))
- {
- throw new InvalidDestinationException("The consumer being created is not 'noConsume'," +
- "but a 'browseOnly' Destination has been supplied.");
- }
-
final String messageSelector;
if (_strictAMQP && !((selector == null) || selector.equals("")))
@@ -2027,16 +1989,8 @@ public abstract class AMQSession<C exten
// argument, as specifying null for the arguments when querying means they should not be checked at all
ft.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector == null ? "" : messageSelector);
- C consumer;
- try
- {
- consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow,
- noLocal, exclusive, messageSelector, ft, noConsume, autoClose);
- }
- catch(TransportException e)
- {
- throw toJMSException("Exception while creating consumer: " + e.getMessage(), e);
- }
+ C consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow,
+ noLocal, exclusive, messageSelector, ft, noConsume, autoClose);
if (_messageListener != null)
{
@@ -2073,10 +2027,7 @@ public abstract class AMQSession<C exten
ex.initCause(e);
throw ex;
}
- catch (TransportException e)
- {
- throw toJMSException("Exception while registering consumer:" + e.getMessage(), e);
- }
+
return consumer;
}
}, _connection).execute();
@@ -2141,7 +2092,7 @@ public abstract class AMQSession<C exten
boolean isInRecovery()
{
- return _sessionInRecovery;
+ return _inRecovery;
}
boolean isQueueBound(AMQShortString exchangeName, AMQShortString queueName) throws JMSException
@@ -2263,7 +2214,7 @@ public abstract class AMQSession<C exten
void setInRecovery(boolean inRecovery)
{
- _sessionInRecovery = inRecovery;
+ _inRecovery = inRecovery;
}
boolean isStarted()
@@ -2444,7 +2395,7 @@ public abstract class AMQSession<C exten
/*
* I could have combined the last 3 methods, but this way it improves readability
*/
- protected Topic checkValidTopic(Topic topic, boolean durable) throws JMSException
+ protected AMQTopic checkValidTopic(Topic topic, boolean durable) throws JMSException
{
if (topic == null)
{
@@ -2463,17 +2414,17 @@ public abstract class AMQSession<C exten
("Cannot create a durable subscription with a temporary topic: " + topic);
}
- if (!(topic instanceof AMQDestination && topic instanceof javax.jms.Topic))
+ if (!(topic instanceof AMQTopic))
{
throw new javax.jms.InvalidDestinationException(
"Cannot create a subscription on topic created for another JMS Provider, class of topic provided is: "
+ topic.getClass().getName());
}
- return topic;
+ return (AMQTopic) topic;
}
- protected Topic checkValidTopic(Topic topic) throws JMSException
+ protected AMQTopic checkValidTopic(Topic topic) throws JMSException
{
return checkValidTopic(topic, false);
}
@@ -2602,9 +2553,15 @@ public abstract class AMQSession<C exten
public abstract void sendConsume(C consumer, AMQShortString queueName,
AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector, int tag) throws AMQException, FailoverException;
- private P createProducerImpl(final Destination destination, final boolean mandatory, final boolean immediate)
+ private P createProducerImpl(Destination destination, boolean mandatory, boolean immediate)
throws JMSException
{
+ return createProducerImpl(destination, mandatory, immediate, DEFAULT_WAIT_ON_SEND);
+ }
+
+ private P createProducerImpl(final Destination destination, final boolean mandatory,
+ final boolean immediate, final boolean waitUntilSent) throws JMSException
+ {
return new FailoverRetrySupport<P, JMSException>(
new FailoverProtectedOperation<P, JMSException>()
{
@@ -2612,18 +2569,8 @@ public abstract class AMQSession<C exten
{
checkNotClosed();
long producerId = getNextProducerId();
-
- P producer;
- try
- {
- producer = createMessageProducer(destination, mandatory,
- immediate, producerId);
- }
- catch (TransportException e)
- {
- throw toJMSException("Exception while creating producer:" + e.getMessage(), e);
- }
-
+ P producer = createMessageProducer(destination, mandatory,
+ immediate, waitUntilSent, producerId);
registerProducer(producerId, producer);
return producer;
@@ -2632,7 +2579,7 @@ public abstract class AMQSession<C exten
}
public abstract P createMessageProducer(final Destination destination, final boolean mandatory,
- final boolean immediate, final long producerId) throws JMSException;
+ final boolean immediate, final boolean waitUntilSent, long producerId) throws JMSException;
private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException
{
@@ -2775,21 +2722,6 @@ public abstract class AMQSession<C exten
}
}
- /**
- * Undeclares the specified temporary queue/topic.
- *
- * <p/>Note that this operation automatically retries in the event of fail-over.
- *
- * @param amqQueue The name of the temporary destination to delete.
- *
- * @throws JMSException If the queue could not be deleted for any reason.
- * @todo Be aware of possible changes to parameter order as versions change.
- */
- protected void deleteTemporaryDestination(final TemporaryDestination amqQueue) throws JMSException
- {
- deleteQueue(amqQueue.getAMQQueueName());
- }
-
public abstract void sendQueueDelete(final AMQShortString queueName) throws AMQException, FailoverException;
private long getNextProducerId()
@@ -2887,7 +2819,6 @@ public abstract class AMQSession<C exten
{
declareQueue(amqd, protocolHandler, consumer.isNoLocal(), nowait);
}
- bindQueue(amqd.getAMQQueueName(), amqd.getRoutingKey(), consumer.getArguments(), amqd.getExchangeName(), amqd, nowait);
}
AMQShortString queueName = amqd.getAMQQueueName();
@@ -2895,6 +2826,8 @@ public abstract class AMQSession<C exten
// store the consumer queue name
consumer.setQueuename(queueName);
+ bindQueue(queueName, amqd.getRoutingKey(), consumer.getArguments(), amqd.getExchangeName(), amqd, nowait);
+
// If IMMEDIATE_PREFETCH is not required then suspsend the channel to delay prefetch
if (!_immediatePrefetch)
{
@@ -3045,10 +2978,6 @@ public abstract class AMQSession<C exten
{
throw new AMQException(null, "Fail-over interrupted suspend/unsuspend channel.", e);
}
- catch (TransportException e)
- {
- throw new AMQException(AMQConstant.getConstant(getErrorCode(e)), e.getMessage(), e);
- }
}
}
@@ -3087,11 +3016,21 @@ public abstract class AMQSession<C exten
*
* @return boolean true if failover has occured.
*/
- public boolean hasFailedOverDirty()
+ public boolean hasFailedOver()
{
return _failedOverDirty;
}
+ /**
+ * Check to see if any message have been sent in this transaction and have not been commited.
+ *
+ * @return boolean true if a message has been sent but not commited
+ */
+ public boolean isDirty()
+ {
+ return _dirty;
+ }
+
public void setTicket(int ticket)
{
_ticket = ticket;
@@ -3204,7 +3143,7 @@ public abstract class AMQSession<C exten
setConnectionStopped(true);
}
- setRollbackMark();
+ _rollbackMark.set(_highestDeliveryTag.get());
_dispatcherLogger.debug("Session Pre Dispatch Queue cleared");
@@ -3353,14 +3292,9 @@ public abstract class AMQSession<C exten
if (!(message instanceof CloseConsumerMessage)
&& tagLE(deliveryTag, _rollbackMark.get()))
{
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Rejecting message because delivery tag " + deliveryTag
- + " <= rollback mark " + _rollbackMark.get());
- }
rejectMessage(message, true);
}
- else if (_usingDispatcherForCleanup)
+ else if (isInRecovery())
{
_unacknowledgedMessageTags.add(deliveryTag);
}
@@ -3419,11 +3353,6 @@ public abstract class AMQSession<C exten
// Don't reject if we're already closing
if (!_closed.get())
{
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Rejecting message with delivery tag " + message.getDeliveryTag()
- + " for closing consumer " + String.valueOf(consumer == null? null: consumer._consumerTag));
- }
rejectMessage(message, true);
}
}
@@ -3521,48 +3450,4 @@ public abstract class AMQSession<C exten
{
return _closing.get()|| _connection.isClosing();
}
-
- public boolean isDeclareExchanges()
- {
- return DECLARE_EXCHANGES;
- }
-
- JMSException toJMSException(String message, TransportException e)
- {
- int code = getErrorCode(e);
- JMSException jmse = new JMSException(message, Integer.toString(code));
- jmse.setLinkedException(e);
- jmse.initCause(e);
- return jmse;
- }
-
- private int getErrorCode(TransportException e)
- {
- int code = AMQConstant.INTERNAL_ERROR.getCode();
- if (e instanceof SessionException)
- {
- SessionException se = (SessionException) e;
- if(se.getException() != null && se.getException().getErrorCode() != null)
- {
- code = se.getException().getErrorCode().getValue();
- }
- }
- return code;
- }
-
- private boolean isBrowseOnlyDestination(Destination destination)
- {
- return ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly());
- }
-
- private void setRollbackMark()
- {
- // Let the dispatcher know that all the incomming messages
- // should be rolled back(reject/release)
- _rollbackMark.set(_highestDeliveryTag.get());
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Rollback mark is set to " + _rollbackMark.get());
- }
- }
}
Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Fri Oct 21 01:19:00 2011
@@ -47,8 +47,6 @@ import org.apache.qpid.client.message.AM
import org.apache.qpid.client.message.FieldTableSupport;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage_0_10;
-import org.apache.qpid.client.messaging.address.Link;
-import org.apache.qpid.client.messaging.address.Link.Reliability;
import org.apache.qpid.client.messaging.address.Node.ExchangeNode;
import org.apache.qpid.client.messaging.address.Node.QueueNode;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
@@ -58,7 +56,6 @@ import org.apache.qpid.framing.FieldTabl
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.transport.ExchangeBoundResult;
import org.apache.qpid.transport.ExchangeQueryResult;
-import org.apache.qpid.transport.ExecutionErrorCode;
import org.apache.qpid.transport.ExecutionException;
import org.apache.qpid.transport.MessageAcceptMode;
import org.apache.qpid.transport.MessageAcquireMode;
@@ -72,7 +69,6 @@ import org.apache.qpid.transport.RangeSe
import org.apache.qpid.transport.Session;
import org.apache.qpid.transport.SessionException;
import org.apache.qpid.transport.SessionListener;
-import org.apache.qpid.transport.TransportException;
import org.apache.qpid.util.Serial;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -160,20 +156,13 @@ public class AMQSession_0_10 extends AMQ
*/
AMQSession_0_10(org.apache.qpid.transport.Connection qpidConnection, AMQConnection con, int channelId,
boolean transacted, int acknowledgeMode, MessageFactoryRegistry messageFactoryRegistry,
- int defaultPrefetchHighMark, int defaultPrefetchLowMark,String name)
+ int defaultPrefetchHighMark, int defaultPrefetchLowMark)
{
super(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, defaultPrefetchHighMark,
defaultPrefetchLowMark);
_qpidConnection = qpidConnection;
- if (name == null)
- {
- _qpidSession = _qpidConnection.createSession(1);
- }
- else
- {
- _qpidSession = _qpidConnection.createSession(name,1);
- }
+ _qpidSession = _qpidConnection.createSession(1);
_qpidSession.setSessionListener(this);
if (_transacted)
{
@@ -200,12 +189,11 @@ public class AMQSession_0_10 extends AMQ
* @param qpidConnection The connection
*/
AMQSession_0_10(org.apache.qpid.transport.Connection qpidConnection, AMQConnection con, int channelId,
- boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow,
- String name)
+ boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow)
{
this(qpidConnection, con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(),
- defaultPrefetchHigh, defaultPrefetchLow,name);
+ defaultPrefetchHigh, defaultPrefetchLow);
}
private void addUnacked(int id)
@@ -270,7 +258,7 @@ public class AMQSession_0_10 extends AMQ
long prefetch = getAMQConnection().getMaxPrefetch();
- if (unackedCount >= prefetch/2 || maxAckDelay <= 0 || _acknowledgeMode == javax.jms.Session.AUTO_ACKNOWLEDGE)
+ if (unackedCount >= prefetch/2 || maxAckDelay <= 0)
{
flushAcknowledgments();
}
@@ -294,34 +282,23 @@ public class AMQSession_0_10 extends AMQ
}
}
- void messageAcknowledge(final RangeSet ranges, final boolean accept)
+ void messageAcknowledge(RangeSet ranges, boolean accept)
{
messageAcknowledge(ranges,accept,false);
}
- void messageAcknowledge(final RangeSet ranges, final boolean accept, final boolean setSyncBit)
+ void messageAcknowledge(RangeSet ranges, boolean accept,boolean setSyncBit)
{
- final Session ssn = getQpidSession();
- flushProcessed(ranges,accept);
- if (accept)
+ Session ssn = getQpidSession();
+ for (Range range : ranges)
{
- ssn.messageAccept(ranges, UNRELIABLE, setSyncBit ? SYNC : NONE);
+ ssn.processed(range);
}
- }
-
- /**
- * Flush any outstanding commands. This causes session complete to be sent.
- * @param ranges the range of command ids.
- * @param batch true if batched.
- */
- void flushProcessed(final RangeSet ranges, final boolean batch)
- {
- final Session ssn = getQpidSession();
- for (final Range range : ranges)
+ ssn.flushProcessed(accept ? BATCH : NONE);
+ if (accept)
{
- ssn.processed(range);
+ ssn.messageAccept(ranges, UNRELIABLE,setSyncBit? SYNC : NONE);
}
- ssn.flushProcessed(batch ? BATCH : NONE);
}
/**
@@ -337,7 +314,7 @@ public class AMQSession_0_10 extends AMQ
public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey,
final FieldTable arguments, final AMQShortString exchangeName,
final AMQDestination destination, final boolean nowait)
- throws AMQException
+ throws AMQException, FailoverException
{
if (destination.getDestSyntax() == DestSyntax.BURL)
{
@@ -423,6 +400,25 @@ public class AMQSession_0_10 extends AMQ
}
}
+
+ /**
+ * Commit the receipt and the delivery of all messages exchanged by this session resources.
+ */
+ public void sendCommit() throws AMQException, FailoverException
+ {
+ getQpidSession().setAutoSync(true);
+ try
+ {
+ getQpidSession().txCommit();
+ }
+ finally
+ {
+ getQpidSession().setAutoSync(false);
+ }
+ // We need to sync so that we get notify of an error.
+ sync();
+ }
+
/**
* Create a queue with a given name.
*
@@ -455,14 +451,6 @@ public class AMQSession_0_10 extends AMQ
public void sendRecover() throws AMQException, FailoverException
{
// release all unacked messages
- RangeSet ranges = gatherUnackedRangeSet();
- getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
- // We need to sync so that we get notify of an error.
- sync();
- }
-
- private RangeSet gatherUnackedRangeSet()
- {
RangeSet ranges = new RangeSet();
while (true)
{
@@ -471,11 +459,11 @@ public class AMQSession_0_10 extends AMQ
{
break;
}
-
- ranges.add(tag.intValue());
+ ranges.add((int) (long) tag);
}
-
- return ranges;
+ getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
+ // We need to sync so that we get notify of an error.
+ sync();
}
@@ -549,6 +537,7 @@ public class AMQSession_0_10 extends AMQ
}
public boolean isQueueBound(final String exchangeName, final String queueName, final String bindingKey,Map<String,Object> args)
+ throws JMSException
{
boolean res;
ExchangeBoundResult bindingQueryResult =
@@ -611,16 +600,10 @@ public class AMQSession_0_10 extends AMQ
(Map<? extends String, ? extends Object>) consumer.getDestination().getLink().getSubscription().getArgs());
}
- boolean acceptModeNone = getAcknowledgeMode() == NO_ACKNOWLEDGE;
-
- if (consumer.getDestination().getLink() != null)
- {
- acceptModeNone = consumer.getDestination().getLink().getReliability() == Link.Reliability.UNRELIABLE;
- }
getQpidSession().messageSubscribe
(queueName.toString(), String.valueOf(tag),
- acceptModeNone ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT,
+ getAcknowledgeMode() == NO_ACKNOWLEDGE ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT,
preAcquire ? MessageAcquireMode.PRE_ACQUIRED : MessageAcquireMode.NOT_ACQUIRED, null, 0, arguments,
consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
}
@@ -676,12 +659,13 @@ public class AMQSession_0_10 extends AMQ
* Create an 0_10 message producer
*/
public BasicMessageProducer_0_10 createMessageProducer(final Destination destination, final boolean mandatory,
- final boolean immediate, final long producerId) throws JMSException
+ final boolean immediate, final boolean waitUntilSent,
+ long producerId) throws JMSException
{
try
{
return new BasicMessageProducer_0_10(_connection, (AMQDestination) destination, _transacted, _channelId, this,
- getProtocolHandler(), producerId, immediate, mandatory);
+ getProtocolHandler(), producerId, immediate, mandatory, waitUntilSent);
}
catch (AMQException e)
{
@@ -691,10 +675,6 @@ public class AMQSession_0_10 extends AMQ
throw ex;
}
- catch(TransportException e)
- {
- throw toJMSException("Exception while creating message producer:" + e.getMessage(), e);
- }
}
@@ -787,7 +767,7 @@ public class AMQSession_0_10 extends AMQ
else
{
QueueNode node = (QueueNode)amqd.getSourceNode();
- getQpidSession().queueDeclare(queueName.toString(), node.getAlternateExchange() ,
+ getQpidSession().queueDeclare(queueName.toString(), "" ,
node.getDeclareArgs(),
node.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE,
node.isDurable() ? Option.DURABLE : Option.NONE,
@@ -924,26 +904,7 @@ public class AMQSession_0_10 extends AMQ
setCurrentException(exc);
}
- public void closed(Session ssn)
- {
- try
- {
- super.closed(null);
- if (flushTask != null)
- {
- flushTask.cancel();
- flushTask = null;
- }
- } catch (Exception e)
- {
- _logger.error("Error closing JMS session", e);
- }
- }
-
- public AMQException getLastException()
- {
- return getCurrentException();
- }
+ public void closed(Session ssn) {}
protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
final boolean noLocal, final boolean nowait)
@@ -997,26 +958,27 @@ public class AMQSession_0_10 extends AMQ
}
}
- public void commitImpl() throws AMQException, FailoverException, TransportException
+ @Override public void commit() throws JMSException
{
- if( _txSize > 0 )
+ checkTransacted();
+ try
{
- messageAcknowledge(_txRangeSet, true);
- _txRangeSet.clear();
- _txSize = 0;
+ if( _txSize > 0 )
+ {
+ messageAcknowledge(_txRangeSet, true);
+ _txRangeSet.clear();
+ _txSize = 0;
+ }
+ sendCommit();
}
-
- getQpidSession().setAutoSync(true);
- try
+ catch (AMQException e)
{
- getQpidSession().txCommit();
+ throw new JMSAMQException("Failed to commit: " + e.getMessage(), e);
}
- finally
+ catch (FailoverException e)
{
- getQpidSession().setAutoSync(false);
+ throw new JMSAMQException("Fail-over interrupted commit. Status of the commit is uncertain.", e);
}
- // We need to sync so that we get notify of an error.
- sync();
}
protected final boolean tagLE(long tag1, long tag2)
@@ -1058,9 +1020,11 @@ public class AMQSession_0_10 extends AMQ
code = ee.getErrorCode().getValue();
}
AMQException amqe = new AMQException(AMQConstant.getConstant(code), se.getMessage(), se.getCause());
+
+ _connection.exceptionReceived(amqe);
+
_currentException = amqe;
}
- _connection.exceptionReceived(_currentException);
}
public AMQMessageDelegateFactory getMessageDelegateFactory()
@@ -1104,37 +1068,22 @@ public class AMQSession_0_10 extends AMQ
return match;
}
- public boolean isQueueExist(AMQDestination dest,QueueNode node,boolean assertNode) throws AMQException
+ public boolean isQueueExist(AMQDestination dest,QueueNode node,boolean assertNode)
{
boolean match = true;
- try
+ QueueQueryResult result = getQpidSession().queueQuery(dest.getAddressName(), Option.NONE).get();
+ match = dest.getAddressName().equals(result.getQueue());
+
+ if (match && assertNode)
{
- QueueQueryResult result = getQpidSession().queueQuery(dest.getAddressName(), Option.NONE).get();
- match = dest.getAddressName().equals(result.getQueue());
-
- if (match && assertNode)
- {
- match = (result.getDurable() == node.isDurable()) &&
- (result.getAutoDelete() == node.isAutoDelete()) &&
- (result.getExclusive() == node.isExclusive()) &&
- (matchProps(result.getArguments(),node.getDeclareArgs()));
- }
- else if (match)
- {
- // should I use the queried details to update the local data structure.
- }
+ match = (result.getDurable() == node.isDurable()) &&
+ (result.getAutoDelete() == node.isAutoDelete()) &&
+ (result.getExclusive() == node.isExclusive()) &&
+ (matchProps(result.getArguments(),node.getDeclareArgs()));
}
- catch(SessionException e)
+ else if (match)
{
- if (e.getException().getErrorCode() == ExecutionErrorCode.RESOURCE_DELETED)
- {
- match = false;
- }
- else
- {
- throw new AMQException(AMQConstant.getConstant(e.getException().getErrorCode().getValue()),
- "Error querying queue",e);
- }
+ // should I use the queried details to update the local data structure.
}
return match;
@@ -1200,22 +1149,6 @@ public class AMQSession_0_10 extends AMQ
int type = resolveAddressType(dest);
- if (type == AMQDestination.QUEUE_TYPE &&
- dest.getLink().getReliability() == Reliability.UNSPECIFIED)
- {
- dest.getLink().setReliability(Reliability.AT_LEAST_ONCE);
- }
- else if (type == AMQDestination.TOPIC_TYPE &&
- dest.getLink().getReliability() == Reliability.UNSPECIFIED)
- {
- dest.getLink().setReliability(Reliability.UNRELIABLE);
- }
- else if (type == AMQDestination.TOPIC_TYPE &&
- dest.getLink().getReliability() == Reliability.AT_LEAST_ONCE)
- {
- throw new AMQException("AT-LEAST-ONCE is not yet supported for Topics");
- }
-
switch (type)
{
case AMQDestination.QUEUE_TYPE:
@@ -1229,8 +1162,6 @@ public class AMQSession_0_10 extends AMQ
{
setLegacyFiledsForQueueType(dest);
send0_10QueueDeclare(dest,null,false,noWait);
- sendQueueBind(dest.getAMQQueueName(), dest.getRoutingKey(),
- null,dest.getExchangeName(),dest, false);
break;
}
}
@@ -1339,8 +1270,6 @@ public class AMQSession_0_10 extends AMQ
dest.getQueueName(),// should have one by now
dest.getSubject(),
Collections.<String,Object>emptyMap()));
- sendQueueBind(dest.getAMQQueueName(), dest.getRoutingKey(),
- null,dest.getExchangeName(),dest, false);
}
public void setLegacyFiledsForQueueType(AMQDestination dest)
@@ -1378,26 +1307,5 @@ public class AMQSession_0_10 extends AMQ
sb.append(">");
return sb.toString();
}
-
- protected void acknowledgeImpl()
- {
- RangeSet range = gatherUnackedRangeSet();
-
- if(range.size() > 0 )
- {
- messageAcknowledge(range, true);
- getQpidSession().sync();
- }
- }
-
- @Override
- void resubscribe() throws AMQException
- {
- // Also reset the delivery tag tracker, to insure we dont
- // return the first <total number of msgs received on session>
- // messages sent by the brokers following the first rollback
- // after failover
- _highestDeliveryTag.set(-1);
- super.resubscribe();
- }
+
}
Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Fri Oct 21 01:19:00 2011
@@ -38,7 +38,6 @@ import org.apache.qpid.client.message.Re
import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.state.AMQState;
-import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.AMQFrame;
@@ -76,12 +75,12 @@ import org.apache.qpid.framing.amqp_0_91
import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.transport.TransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8>
{
+
/** Used for debugging. */
private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
@@ -91,7 +90,7 @@ public final class AMQSession_0_8 extend
* @param con The connection on which to create the session.
* @param channelId The unique identifier for the session.
* @param transacted Indicates whether or not the session is transactional.
- * @param acknowledgeMode The acknowledgement mode for the session.
+ * @param acknowledgeMode The acknoledgement mode for the session.
* @param messageFactoryRegistry The message factory factory for the session.
* @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session.
* @param defaultPrefetchLowMark The number of prefetched messages at which to resume the session.
@@ -109,7 +108,7 @@ public final class AMQSession_0_8 extend
* @param con The connection on which to create the session.
* @param channelId The unique identifier for the session.
* @param transacted Indicates whether or not the session is transactional.
- * @param acknowledgeMode The acknowledgement mode for the session.
+ * @param acknowledgeMode The acknoledgement mode for the session.
* @param defaultPrefetchHigh The maximum number of messages to prefetched before suspending the session.
* @param defaultPrefetchLow The number of prefetched messages at which to resume the session.
*/
@@ -125,20 +124,6 @@ public final class AMQSession_0_8 extend
return getProtocolHandler().getProtocolVersion();
}
- protected void acknowledgeImpl()
- {
- while (true)
- {
- Long tag = _unacknowledgedMessageTags.poll();
- if (tag == null)
- {
- break;
- }
-
- acknowledgeMessage(tag, false);
- }
- }
-
public void acknowledgeMessage(long deliveryTag, boolean multiple)
{
BasicAckBody body = getMethodRegistry().createBasicAckBody(deliveryTag, multiple);
@@ -168,7 +153,7 @@ public final class AMQSession_0_8 extend
// we also need to check the state manager for 08/09 as the
// _connection variable may not be updated in time by the error receiving
// thread.
- // We can't close the session if we are already in the process of
+ // We can't close the session if we are alreadying in the process of
// closing/closed the connection.
if (!(getProtocolHandler().getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED)
@@ -184,20 +169,8 @@ public final class AMQSession_0_8 extend
}
}
- public void commitImpl() throws AMQException, FailoverException, TransportException
+ public void sendCommit() throws AMQException, FailoverException
{
- // Acknowledge all delivered messages
- while (true)
- {
- Long tag = _deliveredMessageTags.poll();
- if (tag == null)
- {
- break;
- }
-
- acknowledgeMessage(tag, false);
- }
-
final AMQProtocolHandler handler = getProtocolHandler();
handler.syncWrite(getProtocolHandler().getMethodRegistry().createTxCommitBody().generateFrame(_channelId), TxCommitOkBody.class);
@@ -427,12 +400,12 @@ public final class AMQSession_0_8 extend
public BasicMessageProducer_0_8 createMessageProducer(final Destination destination, final boolean mandatory,
- final boolean immediate, long producerId) throws JMSException
+ final boolean immediate, final boolean waitUntilSent, long producerId) throws JMSException
{
try
{
return new BasicMessageProducer_0_8(_connection, (AMQDestination) destination, _transacted, _channelId,
- this, getProtocolHandler(), producerId, immediate, mandatory);
+ this, getProtocolHandler(), producerId, immediate, mandatory, waitUntilSent);
}
catch (AMQException e)
{
@@ -604,18 +577,6 @@ public final class AMQSession_0_8 extend
}
- @Override
- protected void deleteTemporaryDestination(final TemporaryDestination amqQueue)
- throws JMSException
- {
- // Currently TemporaryDestination is set to be auto-delete which, for 0-8..0-9-1, means that the queue will be deleted
- // by the server when there are no more subscriptions to that queue/topic (rather than when the client disconnects).
- // This is not quite right for JMSCompliance as the queue/topic should remain until the connection closes, or the
- // client explicitly deletes it.
-
- /* intentional no-op */
- }
-
public boolean isQueueBound(String exchangeName, String queueName,
String bindingKey, Map<String, Object> args) throws JMSException
{
@@ -623,34 +584,4 @@ public final class AMQSession_0_8 extend
queueName == null ? null : new AMQShortString(queueName),
bindingKey == null ? null : new AMQShortString(bindingKey));
}
-
-
- public AMQException getLastException()
- {
- // if the Connection has closed then we should throw any exception that
- // has occurred that we were not waiting for
- AMQStateManager manager = _connection.getProtocolHandler()
- .getStateManager();
-
- Exception e = manager.getLastException();
- if (manager.getCurrentState().equals(AMQState.CONNECTION_CLOSED)
- && e != null)
- {
- if (e instanceof AMQException)
- {
- return (AMQException) e;
- }
- else
- {
- AMQException amqe = new AMQException(AMQConstant
- .getConstant(AMQConstant.INTERNAL_ERROR.getCode()),
- e.getMessage(), e.getCause());
- return amqe;
- }
- }
- else
- {
- return null;
- }
- }
}
Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java Fri Oct 21 01:19:00 2011
@@ -20,13 +20,14 @@
*/
package org.apache.qpid.client;
-import java.util.UUID;
-
import javax.jms.JMSException;
import javax.jms.TemporaryQueue;
import org.apache.qpid.framing.AMQShortString;
+import java.util.Random;
+import java.util.UUID;
+
/** AMQ implementation of a TemporaryQueue. */
final class AMQTemporaryQueue extends AMQQueue implements TemporaryQueue, TemporaryDestination
{
@@ -49,15 +50,11 @@ final class AMQTemporaryQueue extends AM
{
throw new JMSException("Temporary Queue has consumers so cannot be deleted");
}
+ _deleted = true;
- try
- {
- _session.deleteTemporaryDestination(this);
- }
- finally
- {
- _deleted = true;
- }
+ // Currently TemporaryQueue is set to be auto-delete which means that the queue will be deleted
+ // by the server when there are no more subscriptions to that queue. This is probably not
+ // quite right for JMSCompliance.
}
public AMQSession getSession()
Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java Fri Oct 21 01:19:00 2011
@@ -53,14 +53,10 @@ class AMQTemporaryTopic extends AMQTopic
throw new JMSException("Temporary Topic has consumers so cannot be deleted");
}
- try
- {
- _session.deleteTemporaryDestination(this);
- }
- finally
- {
- _deleted = true;
- }
+ _deleted = true;
+ // Currently TemporaryQueue is set to be auto-delete which means that the queue will be deleted
+ // by the server when there are no more subscriptions to that queue. This is probably not
+ // quite right for JMSCompliance.
}
public AMQSession getSession()
Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java Fri Oct 21 01:19:00 2011
@@ -22,7 +22,6 @@ package org.apache.qpid.client;
import java.net.URISyntaxException;
-import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import javax.jms.Topic;
@@ -96,47 +95,39 @@ public class AMQTopic extends AMQDestina
super(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, isDurable,bindingKeys);
}
- public static AMQTopic createDurableTopic(Topic topic, String subscriptionName, AMQConnection connection)
+ public static AMQTopic createDurableTopic(AMQTopic topic, String subscriptionName, AMQConnection connection)
throws JMSException
{
- if (topic instanceof AMQDestination && topic instanceof javax.jms.Topic)
+ if (topic.getDestSyntax() == DestSyntax.ADDR)
{
- AMQDestination qpidTopic = (AMQDestination)topic;
- if (qpidTopic.getDestSyntax() == DestSyntax.ADDR)
+ try
{
- try
- {
- AMQTopic t = new AMQTopic(qpidTopic.getAddress());
- AMQShortString queueName = getDurableTopicQueueName(subscriptionName, connection);
- // link is never null if dest was created using an address string.
- t.getLink().setName(queueName.asString());
- t.getSourceNode().setAutoDelete(false);
- t.getSourceNode().setDurable(true);
-
- // The legacy fields are also populated just in case.
- t.setQueueName(queueName);
- t.setAutoDelete(false);
- t.setDurable(true);
- return t;
- }
- catch(Exception e)
- {
- JMSException ex = new JMSException("Error creating durable topic");
- ex.initCause(e);
- ex.setLinkedException(e);
- throw ex;
- }
+ AMQTopic t = new AMQTopic(topic.getAddress());
+ AMQShortString queueName = getDurableTopicQueueName(subscriptionName, connection);
+ // link is never null if dest was created using an address string.
+ t.getLink().setName(queueName.asString());
+ t.getSourceNode().setAutoDelete(false);
+ t.getSourceNode().setDurable(true);
+
+ // The legacy fields are also populated just in case.
+ t.setQueueName(queueName);
+ t.setAutoDelete(false);
+ t.setDurable(true);
+ return t;
}
- else
+ catch(Exception e)
{
- return new AMQTopic(qpidTopic.getExchangeName(), qpidTopic.getRoutingKey(), false,
- getDurableTopicQueueName(subscriptionName, connection),
- true);
+ JMSException ex = new JMSException("Error creating durable topic");
+ ex.initCause(e);
+ ex.setLinkedException(e);
+ throw ex;
}
}
else
{
- throw new InvalidDestinationException("The destination object used is not from this provider or of type javax.jms.Topic");
+ return new AMQTopic(topic.getExchangeName(), topic.getRoutingKey(), false,
+ getDurableTopicQueueName(subscriptionName, connection),
+ true);
}
}
@@ -147,17 +138,13 @@ public class AMQTopic extends AMQDestina
public String getTopicName() throws JMSException
{
- if (getRoutingKey() != null)
+ if (super.getRoutingKey() == null && super.getSubject() != null)
{
- return getRoutingKey().asString();
- }
- else if (getSubject() != null)
- {
- return getSubject();
+ return super.getSubject();
}
else
{
- return null;
+ return super.getRoutingKey().toString();
}
}
@@ -176,18 +163,12 @@ public class AMQTopic extends AMQDestina
public AMQShortString getRoutingKey()
{
- if (super.getRoutingKey() != null)
- {
- return super.getRoutingKey();
- }
- else if (getSubject() != null)
+ if (super.getRoutingKey() == null && super.getSubject() != null)
{
- return new AMQShortString(getSubject());
+ return new AMQShortString(super.getSubject());
}
else
{
- setRoutingKey(new AMQShortString(""));
- setSubject("");
return super.getRoutingKey();
}
}
Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Fri Oct 21 01:19:00 2011
@@ -27,7 +27,6 @@ import org.apache.qpid.client.protocol.A
import org.apache.qpid.framing.*;
import org.apache.qpid.jms.MessageConsumer;
import org.apache.qpid.jms.Session;
-import org.apache.qpid.transport.TransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,7 +36,10 @@ import javax.jms.MessageListener;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
+import java.util.SortedSet;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -115,10 +117,29 @@ public abstract class BasicMessageConsum
protected final int _acknowledgeMode;
/**
+ * Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode
+ */
+ private int _outstanding;
+
+ /**
+ * Switch to enable sending of acknowledgements when using DUPS_OK_ACKNOWLEDGE mode. Enabled when _outstannding
+ * number of msgs >= _prefetchHigh and disabled at < _prefetchLow
+ */
+ private boolean _dups_ok_acknowledge_send;
+
+ /**
* List of tags delievered, The last of which which should be acknowledged on commit in transaction mode.
*/
private ConcurrentLinkedQueue<Long> _receivedDeliveryTags = new ConcurrentLinkedQueue<Long>();
+ /** The last tag that was "multiple" acknowledged on this session (if transacted) */
+ private long _lastAcked;
+
+ /** set of tags which have previously been acked; but not part of the multiple ack (transacted mode only) */
+ private final SortedSet<Long> _previouslyAcked = new TreeSet<Long>();
+
+ private final Object _commitLock = new Object();
+
/**
* The thread that was used to call receive(). This is important for being able to interrupt that thread if a
* receive() is in progress.
@@ -268,6 +289,17 @@ public abstract class BasicMessageConsum
}
}
+ protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException
+ {
+ if (_session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
+ {
+ _session.addUnacknowledgedMessage(jmsMsg.getDeliveryTag());
+ }
+
+ _session.setInRecovery(false);
+ preDeliver(jmsMsg);
+ }
+
/**
* @param immediate if true then return immediately if the connection is failing over
*
@@ -290,14 +322,14 @@ public abstract class BasicMessageConsum
}
}
- if (isMessageListenerSet())
+ if (!_receiving.compareAndSet(false, true))
{
- throw new javax.jms.IllegalStateException("A listener has already been set.");
+ throw new javax.jms.IllegalStateException("Another thread is already receiving.");
}
- if (!_receiving.compareAndSet(false, true))
+ if (isMessageListenerSet())
{
- throw new javax.jms.IllegalStateException("Another thread is already receiving.");
+ throw new javax.jms.IllegalStateException("A listener has already been set.");
}
_receivingThread = Thread.currentThread();
@@ -376,7 +408,7 @@ public abstract class BasicMessageConsum
final AbstractJMSMessage m = returnMessageOrThrow(o);
if (m != null)
{
- preDeliver(m);
+ preApplicationProcessing(m);
postDeliver(m);
}
return m;
@@ -387,10 +419,6 @@ public abstract class BasicMessageConsum
return null;
}
- catch(TransportException e)
- {
- throw _session.toJMSException("Exception while receiving:" + e.getMessage(), e);
- }
finally
{
releaseReceiving();
@@ -449,7 +477,7 @@ public abstract class BasicMessageConsum
final AbstractJMSMessage m = returnMessageOrThrow(o);
if (m != null)
{
- preDeliver(m);
+ preApplicationProcessing(m);
postDeliver(m);
}
@@ -461,10 +489,6 @@ public abstract class BasicMessageConsum
return null;
}
- catch(TransportException e)
- {
- throw _session.toJMSException("Exception while receiving:" + e.getMessage(), e);
- }
finally
{
releaseReceiving();
@@ -547,7 +571,6 @@ public abstract class BasicMessageConsum
if (!_session.isClosed() || _session.isClosing())
{
sendCancel();
- cleanupQueue();
}
}
catch (AMQException e)
@@ -558,10 +581,6 @@ public abstract class BasicMessageConsum
{
throw new JMSAMQException("FailoverException interrupted basic cancel.", e);
}
- catch (TransportException e)
- {
- throw _session.toJMSException("Exception while closing consumer: " + e.getMessage(), e);
- }
}
}
else
@@ -589,8 +608,6 @@ public abstract class BasicMessageConsum
}
abstract void sendCancel() throws AMQException, FailoverException;
-
- abstract void cleanupQueue() throws AMQException, FailoverException;
/**
* Called when you need to invalidate a consumer. Used for example when failover has occurred and the client has
@@ -701,7 +718,7 @@ public abstract class BasicMessageConsum
{
if (isMessageListenerSet())
{
- preDeliver(jmsMessage);
+ preApplicationProcessing(jmsMessage);
getMessageListener().onMessage(jmsMessage);
postDeliver(jmsMessage);
}
@@ -725,42 +742,49 @@ public abstract class BasicMessageConsum
}
}
- protected void preDeliver(AbstractJMSMessage msg)
+ void preDeliver(AbstractJMSMessage msg)
{
- _session.setInRecovery(false);
-
switch (_acknowledgeMode)
{
+
case Session.PRE_ACKNOWLEDGE:
_session.acknowledgeMessage(msg.getDeliveryTag(), false);
break;
- case Session.AUTO_ACKNOWLEDGE:
- //fall through
- case Session.DUPS_OK_ACKNOWLEDGE:
- _session.addUnacknowledgedMessage(msg.getDeliveryTag());
- break;
+
case Session.CLIENT_ACKNOWLEDGE:
// we set the session so that when the user calls acknowledge() it can call the method on session
// to send out the appropriate frame
msg.setAMQSession(_session);
- _session.addUnacknowledgedMessage(msg.getDeliveryTag());
- _session.markDirty();
break;
case Session.SESSION_TRANSACTED:
- _session.addDeliveredMessage(msg.getDeliveryTag());
- _session.markDirty();
- break;
- case Session.NO_ACKNOWLEDGE:
- //do nothing.
- //path used for NO-ACK consumers, and browsers (see constructor).
+ if (isNoConsume())
+ {
+ _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ }
+ else
+ {
+ _session.addDeliveredMessage(msg.getDeliveryTag());
+ _session.markDirty();
+ }
+
break;
}
+
}
- void postDeliver(AbstractJMSMessage msg)
+ void postDeliver(AbstractJMSMessage msg) throws JMSException
{
switch (_acknowledgeMode)
{
+
+ case Session.CLIENT_ACKNOWLEDGE:
+ if (isNoConsume())
+ {
+ _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ }
+ _session.markDirty();
+ break;
+
case Session.DUPS_OK_ACKNOWLEDGE:
case Session.AUTO_ACKNOWLEDGE:
// we do not auto ack a message if the application code called recover()
@@ -798,6 +822,63 @@ public abstract class BasicMessageConsum
return null;
}
+ /**
+ * Acknowledge up to last message delivered (if any). Used when commiting.
+ */
+ void acknowledgeDelivered()
+ {
+ synchronized(_commitLock)
+ {
+ ArrayList<Long> tagsToAck = new ArrayList<Long>();
+
+ while (!_receivedDeliveryTags.isEmpty())
+ {
+ tagsToAck.add(_receivedDeliveryTags.poll());
+ }
+
+ Collections.sort(tagsToAck);
+
+ long prevAcked = _lastAcked;
+ long oldAckPoint = -1;
+
+ while(oldAckPoint != prevAcked)
+ {
+ oldAckPoint = prevAcked;
+
+ Iterator<Long> tagsToAckIterator = tagsToAck.iterator();
+
+ while(tagsToAckIterator.hasNext() && tagsToAckIterator.next() == prevAcked+1)
+ {
+ tagsToAckIterator.remove();
+ prevAcked++;
+ }
+
+ Iterator<Long> previousAckIterator = _previouslyAcked.iterator();
+ while(previousAckIterator.hasNext() && previousAckIterator.next() == prevAcked+1)
+ {
+ previousAckIterator.remove();
+ prevAcked++;
+ }
+
+ }
+ if(prevAcked != _lastAcked)
+ {
+ _session.acknowledgeMessage(prevAcked, true);
+ _lastAcked = prevAcked;
+ }
+
+ Iterator<Long> tagsToAckIterator = tagsToAck.iterator();
+
+ while(tagsToAckIterator.hasNext())
+ {
+ Long tag = tagsToAckIterator.next();
+ _session.acknowledgeMessage(tag, false);
+ _previouslyAcked.add(tag);
+ }
+ }
+ }
+
+
void notifyError(Throwable cause)
{
// synchronized (_closed)
@@ -876,7 +957,7 @@ public abstract class BasicMessageConsum
public boolean isNoConsume()
{
- return _noConsume;
+ return _noConsume || _destination.isBrowseOnly() ;
}
public void rollback()
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org