You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/09/11 13:39:10 UTC
svn commit: r574555 - in /incubator/qpid/branches/M2.1/java:
client/src/main/java/org/apache/qpid/client/
systests/src/main/java/org/apache/qpid/server/failure/
Author: ritchiem
Date: Tue Sep 11 04:39:10 2007
New Revision: 574555
URL: http://svn.apache.org/viewvc?rev=574555&view=rev
Log:
QPID-590 : Provide test case and resolution to prevent deadlock occurring on the client when two threads work on the AMQSession object.
Added:
incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/failure/DeadlockTest.java (with props)
Modified:
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
Modified: incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=574555&r1=574554&r2=574555&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Tue Sep 11 04:39:10 2007
@@ -72,7 +72,6 @@
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.URLSyntaxException;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -100,7 +99,6 @@
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
-
import java.io.Serializable;
import java.text.MessageFormat;
import java.util.ArrayList;
@@ -206,14 +204,14 @@
* subscriptions between executions of the client.
*/
private final ConcurrentHashMap<String, TopicSubscriberAdaptor> _subscriptions =
- new ConcurrentHashMap<String, TopicSubscriberAdaptor>();
+ new ConcurrentHashMap<String, TopicSubscriberAdaptor>();
/**
* Holds a mapping from message consumers to their identifying names, so that their subscriptions may be looked
* up in the {@link #_subscriptions} map.
*/
private final ConcurrentHashMap<BasicMessageConsumer, String> _reverseSubscriptionMap =
- new ConcurrentHashMap<BasicMessageConsumer, String>();
+ new ConcurrentHashMap<BasicMessageConsumer, String>();
/**
* Used to hold incoming messages.
@@ -241,11 +239,11 @@
* consumer.
*/
private Map<AMQShortString, BasicMessageConsumer> _consumers =
- new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>();
+ new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>();
/** Provides a count of consumers on destinations, in order to be able to know if a destination has consumers. */
private ConcurrentHashMap<Destination, AtomicInteger> _destinationConsumerCount =
- new ConcurrentHashMap<Destination, AtomicInteger>();
+ new ConcurrentHashMap<Destination, AtomicInteger>();
/**
* Used as a source of unique identifiers for producers within the session.
@@ -305,15 +303,15 @@
* @param defaultPrefetchLowMark The number of prefetched messages at which to resume the session.
*/
AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
- MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark)
+ MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark)
{
_strictAMQP = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP, STRICT_AMQP_DEFAULT));
_strictAMQPFATAL =
- Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP_FATAL, STRICT_AMQP_FATAL_DEFAULT));
+ Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP_FATAL, STRICT_AMQP_FATAL_DEFAULT));
_immediatePrefetch =
- _strictAMQP
- || Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT));
+ _strictAMQP
+ || Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT));
_connection = con;
_transacted = transacted;
@@ -334,31 +332,31 @@
if (_acknowledgeMode == NO_ACKNOWLEDGE)
{
_queue =
- new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark,
- new FlowControllingBlockingQueue.ThresholdListener()
- {
- public void aboveThreshold(int currentValue)
- {
- if (_acknowledgeMode == NO_ACKNOWLEDGE)
- {
- _logger.debug(
- "Above threshold(" + _defaultPrefetchHighMark
- + ") so suspending channel. Current value is " + currentValue);
- new Thread(new SuspenderRunner(true)).start();
- }
- }
-
- public void underThreshold(int currentValue)
- {
- if (_acknowledgeMode == NO_ACKNOWLEDGE)
- {
- _logger.debug(
- "Below threshold(" + _defaultPrefetchLowMark
- + ") so unsuspending channel. Current value is " + currentValue);
- new Thread(new SuspenderRunner(false)).start();
- }
- }
- });
+ new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark,
+ new FlowControllingBlockingQueue.ThresholdListener()
+ {
+ public void aboveThreshold(int currentValue)
+ {
+ if (_acknowledgeMode == NO_ACKNOWLEDGE)
+ {
+ _logger.debug(
+ "Above threshold(" + _defaultPrefetchHighMark
+ + ") so suspending channel. Current value is " + currentValue);
+ new Thread(new SuspenderRunner(true)).start();
+ }
+ }
+
+ public void underThreshold(int currentValue)
+ {
+ if (_acknowledgeMode == NO_ACKNOWLEDGE)
+ {
+ _logger.debug(
+ "Below threshold(" + _defaultPrefetchLowMark
+ + ") so unsuspending channel. Current value is " + currentValue);
+ new Thread(new SuspenderRunner(false)).start();
+ }
+ }
+ });
}
else
{
@@ -377,10 +375,10 @@
* @param defaultPrefetchLow The number of prefetched messages at which to resume the session.
*/
AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh,
- int defaultPrefetchLow)
+ int defaultPrefetchLow)
{
this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh,
- defaultPrefetchLow);
+ defaultPrefetchLow);
}
// ===== JMS Session methods.
@@ -435,8 +433,8 @@
public void acknowledgeMessage(long deliveryTag, boolean multiple)
{
final AMQFrame ackFrame =
- BasicAckBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), deliveryTag,
- multiple);
+ BasicAckBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), deliveryTag,
+ multiple);
if (_logger.isDebugEnabled())
{
@@ -463,27 +461,27 @@
* @todo Document the additional arguments that may be passed in the field table. Are these for headers exchanges?
*/
public void bindQueue(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
- final AMQShortString exchangeName) throws AMQException
+ final AMQShortString exchangeName) throws AMQException
{
/*new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()*/
new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
+ {
+ public Object execute() throws AMQException, FailoverException
{
- public Object execute() throws AMQException, FailoverException
- {
- AMQFrame queueBind =
+ AMQFrame queueBind =
QueueBindBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(),
- arguments, // arguments
- exchangeName, // exchange
- false, // nowait
- queueName, // queue
- routingKey, // routingKey
- getTicket()); // ticket
+ arguments, // arguments
+ exchangeName, // exchange
+ false, // nowait
+ queueName, // queue
+ routingKey, // routingKey
+ getTicket()); // ticket
- getProtocolHandler().syncWrite(queueBind, QueueBindOkBody.class);
+ getProtocolHandler().syncWrite(queueBind, QueueBindOkBody.class);
- return null;
- }
- }, _connection).execute();
+ return null;
+ }
+ }, _connection).execute();
}
/**
@@ -510,59 +508,59 @@
if (_logger.isInfoEnabled())
{
_logger.info("Closing session: " + this + ":"
- + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
+ + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
}
- synchronized(_messageDeliveryLock)
+ synchronized (_messageDeliveryLock)
{
- // We must close down all producers and consumers in an orderly fashion. This is the only method
- // that can be called from a different thread of control from the one controlling the session.
- synchronized (_connection.getFailoverMutex())
- {
- // Ensure we only try and close an open session.
- if (!_closed.getAndSet(true))
+ // We must close down all producers and consumers in an orderly fashion. This is the only method
+ // that can be called from a different thread of control from the one controlling the session.
+ synchronized (_connection.getFailoverMutex())
{
- // we pass null since this is not an error case
- closeProducersAndConsumers(null);
-
- try
+ // Ensure we only try and close an open session.
+ if (!_closed.getAndSet(true))
{
+ // we pass null since this is not an error case
+ closeProducersAndConsumers(null);
- getProtocolHandler().closeSession(this);
+ try
+ {
- final AMQFrame frame =
- ChannelCloseBody.createAMQFrame(getChannelId(), getProtocolMajorVersion(), getProtocolMinorVersion(),
- 0, // classId
- 0, // methodId
- AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
- new AMQShortString("JMS client closing channel")); // replyText
+ getProtocolHandler().closeSession(this);
- getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class, timeout);
+ final AMQFrame frame =
+ ChannelCloseBody.createAMQFrame(getChannelId(), getProtocolMajorVersion(), getProtocolMinorVersion(),
+ 0, // classId
+ 0, // methodId
+ AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
+ new AMQShortString("JMS client closing channel")); // replyText
- // When control resumes at this point, a reply will have been received that
- // indicates the broker has closed the channel successfully.
- }
- catch (AMQException e)
- {
- JMSException jmse = new JMSException("Error closing session: " + e);
- jmse.setLinkedException(e);
- throw jmse;
- }
- // This is ignored because the channel is already marked as closed so the fail-over process will
- // not re-open it.
- catch (FailoverException e)
- {
- _logger.debug(
- "Got FailoverException during channel close, ignored as channel already marked as closed.");
- }
- finally
- {
- _connection.deregisterSession(_channelId);
+ getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class, timeout);
+
+ // When control resumes at this point, a reply will have been received that
+ // indicates the broker has closed the channel successfully.
+ }
+ catch (AMQException e)
+ {
+ JMSException jmse = new JMSException("Error closing session: " + e);
+ jmse.setLinkedException(e);
+ throw jmse;
+ }
+ // This is ignored because the channel is already marked as closed so the fail-over process will
+ // not re-open it.
+ catch (FailoverException e)
+ {
+ _logger.debug(
+ "Got FailoverException during channel close, ignored as channel already marked as closed.");
+ }
+ finally
+ {
+ _connection.deregisterSession(_channelId);
+ }
}
}
}
- }
}
/**
@@ -572,26 +570,26 @@
*/
public void closed(Throwable e) throws JMSException
{
- synchronized(_messageDeliveryLock)
+ synchronized (_messageDeliveryLock)
{
- synchronized (_connection.getFailoverMutex())
- {
- // An AMQException has an error code and message already and will be passed in when closure occurs as a
- // result of a channel close request
- _closed.set(true);
- AMQException amqe;
- if (e instanceof AMQException)
+ synchronized (_connection.getFailoverMutex())
{
- amqe = (AMQException) e;
- }
- else
- {
- amqe = new AMQException("Closing session forcibly", e);
- }
+ // An AMQException has an error code and message already and will be passed in when closure occurs as a
+ // result of a channel close request
+ _closed.set(true);
+ AMQException amqe;
+ if (e instanceof AMQException)
+ {
+ amqe = (AMQException) e;
+ }
+ else
+ {
+ amqe = new AMQException("Closing session forcibly", e);
+ }
- _connection.deregisterSession(_channelId);
- closeProducersAndConsumers(amqe);
- }
+ _connection.deregisterSession(_channelId);
+ closeProducersAndConsumers(amqe);
+ }
}
}
@@ -626,7 +624,7 @@
final AMQProtocolHandler handler = getProtocolHandler();
handler.syncWrite(TxCommitBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()),
- TxCommitOkBody.class);
+ TxCommitOkBody.class);
}
catch (AMQException e)
{
@@ -709,12 +707,12 @@
}
public MessageConsumer createBrowserConsumer(Destination destination, String messageSelector, boolean noLocal)
- throws JMSException
+ throws JMSException
{
checkValidDestination(destination);
return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, false,
- messageSelector, null, true, true);
+ messageSelector, null, true, true);
}
public MessageConsumer createConsumer(Destination destination) throws JMSException
@@ -722,7 +720,7 @@
checkValidDestination(destination);
return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, null, null,
- false, false);
+ false, false);
}
public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
@@ -730,20 +728,20 @@
checkValidDestination(destination);
return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false,
- messageSelector, null, false, false);
+ messageSelector, null, false, false);
}
public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal)
- throws JMSException
+ throws JMSException
{
checkValidDestination(destination);
return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, false,
- messageSelector, null, false, false);
+ messageSelector, null, false, false);
}
public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive,
- String selector) throws JMSException
+ String selector) throws JMSException
{
checkValidDestination(destination);
@@ -751,7 +749,7 @@
}
public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal,
- boolean exclusive, String selector) throws JMSException
+ boolean exclusive, String selector) throws JMSException
{
checkValidDestination(destination);
@@ -759,7 +757,7 @@
}
public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive,
- String selector, FieldTable rawSelector) throws JMSException
+ String selector, FieldTable rawSelector) throws JMSException
{
checkValidDestination(destination);
@@ -767,12 +765,12 @@
}
public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal,
- boolean exclusive, String selector, FieldTable rawSelector) throws JMSException
+ boolean exclusive, String selector, FieldTable rawSelector) throws JMSException
{
checkValidDestination(destination);
return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, rawSelector, false,
- false);
+ false);
}
public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
@@ -787,7 +785,7 @@
if (subscriber.getTopic().equals(topic))
{
throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription exchange "
- + name);
+ + name);
}
else
{
@@ -815,7 +813,7 @@
else
{
_logger.warn("Unable to determine if subscription already exists for '" + topicName + "' "
- + "for creation durableSubscriber. Requesting queue deletion regardless.");
+ + "for creation durableSubscriber. Requesting queue deletion regardless.");
}
deleteQueue(dest.getAMQQueueName());
@@ -825,7 +823,7 @@
// if the queue is bound to the exchange but NOT for this topic, then the JMS spec
// says we must trash the subscription.
if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName())
- && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName))
+ && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName))
{
deleteQueue(dest.getAMQQueueName());
}
@@ -842,7 +840,7 @@
/** Note, currently this does not handle reuse of the same name with different topics correctly. */
public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal)
- throws JMSException
+ throws JMSException
{
checkNotClosed();
checkValidTopic(topic);
@@ -899,13 +897,13 @@
}
public BasicMessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate)
- throws JMSException
+ throws JMSException
{
return createProducerImpl(destination, mandatory, immediate);
}
public BasicMessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate,
- boolean waitUntilSent) throws JMSException
+ boolean waitUntilSent) throws JMSException
{
return createProducerImpl(destination, mandatory, immediate, waitUntilSent);
}
@@ -955,28 +953,28 @@
* @todo Be aware of possible changes to parameter order as versions change.
*/
public void createQueue(final AMQShortString name, final boolean autoDelete, final boolean durable,
- final boolean exclusive) throws AMQException
+ final boolean exclusive) throws AMQException
{
new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
+ {
+ public Object execute() throws AMQException, FailoverException
{
- public Object execute() throws AMQException, FailoverException
- {
- AMQFrame queueDeclare =
+ AMQFrame queueDeclare =
QueueDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(),
- null, // arguments
- autoDelete, // autoDelete
- durable, // durable
- exclusive, // exclusive
- false, // nowait
- false, // passive
- name, // queue
- getTicket()); // ticket
+ null, // arguments
+ autoDelete, // autoDelete
+ durable, // durable
+ exclusive, // exclusive
+ false, // nowait
+ false, // passive
+ name, // queue
+ getTicket()); // ticket
- getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class);
+ getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class);
- return null;
- }
- }, _connection).execute();
+ return null;
+ }
+ }, _connection).execute();
}
/**
@@ -1269,8 +1267,8 @@
if (_logger.isDebugEnabled())
{
_logger.debug("Message["
- + ((message.getDeliverBody() == null) ? ("B:" + message.getBounceBody()) : ("D:" + message.getDeliverBody()))
- + "] received in session with channel id " + _channelId);
+ + ((message.getDeliverBody() == null) ? ("B:" + message.getBounceBody()) : ("D:" + message.getDeliverBody()))
+ + "] received in session with channel id " + _channelId);
}
if (message.getDeliverBody() == null)
@@ -1343,15 +1341,15 @@
{
// We can't use the BasicRecoverBody-OK method as it isn't part of the spec.
_connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(), getProtocolMinorVersion(), false)); // requeue
+ getProtocolMajorVersion(), getProtocolMinorVersion(), false)); // requeue
_logger.warn("Session Recover cannot be guaranteed with STRICT_AMQP. Messages may arrive out of order.");
}
else
{
_connection.getProtocolHandler().syncWrite(
- BasicRecoverBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), false) // requeue
- , BasicRecoverOkBody.class);
+ BasicRecoverBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), false) // requeue
+ , BasicRecoverOkBody.class);
}
if (!isSuspended)
@@ -1401,8 +1399,8 @@
}
AMQFrame basicRejectBody =
- BasicRejectBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), deliveryTag,
- requeue);
+ BasicRejectBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), deliveryTag,
+ requeue);
_connection.getProtocolHandler().writeFrame(basicRejectBody);
}
@@ -1442,7 +1440,7 @@
}
_connection.getProtocolHandler().syncWrite(TxRollbackBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class);
+ getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class);
if (!isSuspended)
{
@@ -1521,7 +1519,7 @@
else
{
_logger.warn("Unable to determine if subscription already exists for '" + name + "' for unsubscribe."
- + " Requesting queue deletion regardless.");
+ + " Requesting queue deletion regardless.");
}
deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection));
@@ -1542,8 +1540,8 @@
}
protected MessageConsumer createConsumerImpl(final Destination destination, final int prefetchHigh,
- final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable rawSelector,
- final boolean noConsume, final boolean autoClose) throws JMSException
+ final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable rawSelector,
+ final boolean noConsume, final boolean autoClose) throws JMSException
{
checkTemporaryDestination(destination);
@@ -1586,9 +1584,9 @@
}
BasicMessageConsumer consumer =
- new BasicMessageConsumer(_channelId, _connection, amqd, messageSelector, noLocal,
- _messageFactoryRegistry, AMQSession.this, protocolHandler, ft, prefetchHigh, prefetchLow,
- exclusive, _acknowledgeMode, noConsume, autoClose);
+ new BasicMessageConsumer(_channelId, _connection, amqd, messageSelector, noLocal,
+ _messageFactoryRegistry, AMQSession.this, protocolHandler, ft, prefetchHigh, prefetchLow,
+ exclusive, _acknowledgeMode, noConsume, autoClose);
if (_messageListener != null)
{
@@ -1608,7 +1606,7 @@
catch (AMQInvalidRoutingKeyException e)
{
JMSException ide =
- new InvalidDestinationException("Invalid routing key:" + amqd.getRoutingKey().toString());
+ new InvalidDestinationException("Invalid routing key:" + amqd.getRoutingKey().toString());
ide.setLinkedException(e);
throw ide;
}
@@ -1694,26 +1692,26 @@
* @todo Be aware of possible changes to parameter order as versions change.
*/
boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey)
- throws JMSException
+ throws JMSException
{
try
{
AMQMethodEvent response =
- new FailoverRetrySupport<AMQMethodEvent, AMQException>(
- new FailoverProtectedOperation<AMQMethodEvent, AMQException>()
- {
- public AMQMethodEvent execute() throws AMQException, FailoverException
- {
- AMQFrame boundFrame =
- ExchangeBoundBody.createAMQFrame(_channelId, getProtocolMajorVersion(),
- getProtocolMinorVersion(), exchangeName, // exchange
- queueName, // queue
- routingKey); // routingKey
+ new FailoverRetrySupport<AMQMethodEvent, AMQException>(
+ new FailoverProtectedOperation<AMQMethodEvent, AMQException>()
+ {
+ public AMQMethodEvent execute() throws AMQException, FailoverException
+ {
+ AMQFrame boundFrame =
+ ExchangeBoundBody.createAMQFrame(_channelId, getProtocolMajorVersion(),
+ getProtocolMinorVersion(), exchangeName, // exchange
+ queueName, // queue
+ routingKey); // routingKey
- return getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class);
+ return getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class);
- }
- }, _connection).execute();
+ }
+ }, _connection).execute();
// Extract and return the response code from the query.
ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod();
@@ -1783,9 +1781,16 @@
}
}
- synchronized void startDistpatcherIfNecessary()
+ void startDistpatcherIfNecessary()
{
+ //If we are the dispatcher then we don't need to check we are started
+ if (Thread.currentThread() == _dispatcher)
+ {
+ return;
+ }
+
// If IMMEDIATE_PREFETCH is not set then we need to start fetching
+ // This is final per session so will be multi-thread safe.
if (!_immediatePrefetch)
{
// We do this now if this is the first call on a started connection
@@ -1922,14 +1927,14 @@
if ((topic instanceof TemporaryDestination) && (((TemporaryDestination) topic).getSession() != this))
{
throw new javax.jms.InvalidDestinationException(
- "Cannot create a subscription on a temporary topic created in another session");
+ "Cannot create a subscription on a temporary topic created in another session");
}
if (!(topic instanceof AMQTopic))
{
throw new javax.jms.InvalidDestinationException(
- "Cannot create a subscription on topic created for another JMS Provider, class of topic provided is: "
- + topic.getClass().getName());
+ "Cannot create a subscription on topic created for another JMS Provider, class of topic provided is: "
+ + topic.getClass().getName());
}
return (AMQTopic) topic;
@@ -2029,7 +2034,7 @@
* @param queueName
*/
private void consumeFromQueue(BasicMessageConsumer consumer, AMQShortString queueName,
- AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector) throws AMQException, FailoverException
+ AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector) throws AMQException, FailoverException
{
// need to generate a consumer tag on the client so we can exploit the nowait flag
AMQShortString tag = new AMQShortString(Integer.toString(_nextTag++));
@@ -2058,14 +2063,14 @@
{
// TODO: Be aware of possible changes to parameter order as versions change.
AMQFrame jmsConsume =
- BasicConsumeBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), arguments, // arguments
- tag, // consumerTag
- consumer.isExclusive(), // exclusive
- consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, // noAck
- consumer.isNoLocal(), // noLocal
- nowait, // nowait
- queueName, // queue
- getTicket()); // ticket
+ BasicConsumeBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), arguments, // arguments
+ tag, // consumerTag
+ consumer.isExclusive(), // exclusive
+ consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, // noAck
+ consumer.isNoLocal(), // noLocal
+ nowait, // nowait
+ queueName, // queue
+ getTicket()); // ticket
if (nowait)
{
@@ -2085,13 +2090,13 @@
}
private BasicMessageProducer createProducerImpl(Destination destination, boolean mandatory, boolean immediate)
- throws JMSException
+ throws JMSException
{
return createProducerImpl(destination, mandatory, immediate, false);
}
private BasicMessageProducer createProducerImpl(final Destination destination, final boolean mandatory,
- final boolean immediate, final boolean waitUntilSent) throws JMSException
+ final boolean immediate, final boolean waitUntilSent) throws JMSException
{
return new FailoverRetrySupport<BasicMessageProducer, JMSException>(
new FailoverProtectedOperation<BasicMessageProducer, JMSException>()
@@ -2101,8 +2106,8 @@
checkNotClosed();
long producerId = getNextProducerId();
BasicMessageProducer producer =
- new BasicMessageProducer(_connection, (AMQDestination) destination, _transacted, _channelId,
- AMQSession.this, getProtocolHandler(), producerId, immediate, mandatory, waitUntilSent);
+ new BasicMessageProducer(_connection, (AMQDestination) destination, _transacted, _channelId,
+ AMQSession.this, getProtocolHandler(), producerId, immediate, mandatory, waitUntilSent);
registerProducer(producerId, producer);
return producer;
@@ -2130,29 +2135,29 @@
* @todo Be aware of possible changes to parameter order as versions change.
*/
private void declareExchange(final AMQShortString name, final AMQShortString type,
- final AMQProtocolHandler protocolHandler, final boolean nowait) throws AMQException
+ final AMQProtocolHandler protocolHandler, final boolean nowait) throws AMQException
{
new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
+ {
+ public Object execute() throws AMQException, FailoverException
{
- public Object execute() throws AMQException, FailoverException
- {
- AMQFrame exchangeDeclare =
+ AMQFrame exchangeDeclare =
ExchangeDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(),
- null, // arguments
- false, // autoDelete
- false, // durable
- name, // exchange
- false, // internal
- nowait, // nowait
- false, // passive
- getTicket(), // ticket
- type); // type
+ null, // arguments
+ false, // autoDelete
+ false, // durable
+ name, // exchange
+ false, // internal
+ nowait, // nowait
+ false, // passive
+ getTicket(), // ticket
+ type); // type
- protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
+ protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
- return null;
- }
- }, _connection).execute();
+ return null;
+ }
+ }, _connection).execute();
}
/**
@@ -2177,7 +2182,7 @@
* @todo Be aware of possible changes to parameter order as versions change.
*/
private AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler)
- throws AMQException
+ throws AMQException
{
/*return new FailoverRetrySupport<AMQShortString, AMQException>(*/
return new FailoverNoopSupport<AMQShortString, AMQException>(
@@ -2192,15 +2197,15 @@
}
AMQFrame queueDeclare =
- QueueDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(),
- null, // arguments
- amqd.isAutoDelete(), // autoDelete
- amqd.isDurable(), // durable
- amqd.isExclusive(), // exclusive
- false, // nowait
- false, // passive
- amqd.getAMQQueueName(), // queue
- getTicket()); // ticket
+ QueueDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(),
+ null, // arguments
+ amqd.isAutoDelete(), // autoDelete
+ amqd.isDurable(), // durable
+ amqd.isExclusive(), // exclusive
+ false, // nowait
+ false, // passive
+ amqd.getAMQQueueName(), // queue
+ getTicket()); // ticket
protocolHandler.syncWrite(queueDeclare, QueueDeclareOkBody.class);
@@ -2225,22 +2230,22 @@
try
{
new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
+ {
+ public Object execute() throws AMQException, FailoverException
{
- public Object execute() throws AMQException, FailoverException
- {
- AMQFrame queueDeleteFrame =
+ AMQFrame queueDeleteFrame =
QueueDeleteBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(),
- false, // ifEmpty
- false, // ifUnused
- true, // nowait
- queueName, // queue
- getTicket()); // ticket
+ false, // ifEmpty
+ false, // ifUnused
+ true, // nowait
+ queueName, // queue
+ getTicket()); // ticket
- getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class);
+ getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class);
- return null;
- }
- }, _connection).execute();
+ return null;
+ }
+ }, _connection).execute();
}
catch (AMQException e)
{
@@ -2359,7 +2364,7 @@
{
suspendChannel(true);
_logger.info(
- "Prefetching delayed existing messages will not flow until requested via receive*() or setML().");
+ "Prefetching delayed existing messages will not flow until requested via receive*() or setML().");
}
catch (AMQException e)
{
@@ -2408,7 +2413,7 @@
if (_logger.isInfoEnabled())
{
_logger.info("Rejecting messages from _queue for Consumer tag(" + consumerTag + ") (PDispatchQ) requeue:"
- + requeue);
+ + requeue);
if (messages.hasNext())
{
@@ -2428,7 +2433,7 @@
if (_logger.isDebugEnabled())
{
_logger.debug("Removing message(" + System.identityHashCode(message) + ") from _queue DT:"
- + message.getDeliverBody().deliveryTag);
+ + message.getDeliverBody().deliveryTag);
}
messages.remove();
@@ -2469,44 +2474,44 @@
private void returnBouncedMessage(final UnprocessedMessage message)
{
_connection.performConnectionTask(new Runnable()
+ {
+ public void run()
{
- public void run()
+ try
{
- try
- {
- // Bounced message is processed here, away from the mina thread
- AbstractJMSMessage bouncedMessage =
+ // Bounced message is processed here, away from the mina thread
+ AbstractJMSMessage bouncedMessage =
_messageFactoryRegistry.createMessage(0, false, message.getBounceBody().exchange,
- message.getBounceBody().routingKey, message.getContentHeader(), message.getBodies());
+ message.getBounceBody().routingKey, message.getContentHeader(), message.getBodies());
- AMQConstant errorCode = AMQConstant.getConstant(message.getBounceBody().replyCode);
- AMQShortString reason = message.getBounceBody().replyText;
- _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
-
- // @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
- if (errorCode == AMQConstant.NO_CONSUMERS)
- {
- _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage));
- }
- else if (errorCode == AMQConstant.NO_ROUTE)
- {
- _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage));
- }
- else
- {
- _connection.exceptionReceived(
- new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage));
- }
+ AMQConstant errorCode = AMQConstant.getConstant(message.getBounceBody().replyCode);
+ AMQShortString reason = message.getBounceBody().replyText;
+ _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
+ // @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
+ if (errorCode == AMQConstant.NO_CONSUMERS)
+ {
+ _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage));
}
- catch (Exception e)
+ else if (errorCode == AMQConstant.NO_ROUTE)
{
- _logger.error(
+ _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage));
+ }
+ else
+ {
+ _connection.exceptionReceived(
+ new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage));
+ }
+
+ }
+ catch (Exception e)
+ {
+ _logger.error(
"Caught exception trying to raise undelivered message exception (dump follows) - ignoring...",
e);
- }
}
- });
+ }
+ });
}
/**
@@ -2533,8 +2538,8 @@
_suspended = suspend;
AMQFrame channelFlowFrame =
- ChannelFlowBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(),
- !suspend);
+ ChannelFlowBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(),
+ !suspend);
_connection.getProtocolHandler().syncWrite(channelFlowFrame, ChannelFlowOkBody.class);
}
@@ -2670,7 +2675,7 @@
_lock.wait();
}
- synchronized(_messageDeliveryLock)
+ synchronized (_messageDeliveryLock)
{
dispatchMessage(message);
}
@@ -2713,7 +2718,7 @@
if (_dispatcherLogger.isDebugEnabled())
{
_dispatcherLogger.debug("Set Dispatcher Connection " + (connectionStopped ? "Stopped" : "Started")
- + ": Currently " + (currently ? "Stopped" : "Started"));
+ + ": Currently " + (currently ? "Stopped" : "Started"));
}
}
@@ -2725,7 +2730,7 @@
if (message.getDeliverBody() != null)
{
final BasicMessageConsumer consumer =
- (BasicMessageConsumer) _consumers.get(message.getDeliverBody().consumerTag);
+ (BasicMessageConsumer) _consumers.get(message.getDeliverBody().consumerTag);
if ((consumer == null) || consumer.isClosed())
{
@@ -2734,14 +2739,14 @@
if (consumer == null)
{
_dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "["
- + message.getDeliverBody().deliveryTag + "] from queue "
- + message.getDeliverBody().consumerTag + " )without a handler - rejecting(requeue)...");
+ + message.getDeliverBody().deliveryTag + "] from queue "
+ + message.getDeliverBody().consumerTag + " )without a handler - rejecting(requeue)...");
}
else
{
_dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "["
- + message.getDeliverBody().deliveryTag + "] from queue " + " consumer("
- + consumer.debugIdentity() + ") is closed rejecting(requeue)...");
+ + message.getDeliverBody().deliveryTag + "] from queue " + " consumer("
+ + consumer.debugIdentity() + ") is closed rejecting(requeue)...");
}
}
// Don't reject if we're already closing
Modified: incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=574555&r1=574554&r2=574555&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Tue Sep 11 04:39:10 2007
@@ -240,15 +240,12 @@
if (messageListener != null)
{
- // handle case where connection has already been started, and the dispatcher has alreaded started
+ //todo: handle case where connection has already been started, and the dispatcher has alreaded started
// putting values on the _synchronousQueue
- synchronized (_session)
- {
_messageListener.set(messageListener);
_session.setHasMessageListeners();
_session.startDistpatcherIfNecessary();
- }
}
}
}
Added: incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/failure/DeadlockTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/failure/DeadlockTest.java?rev=574555&view=auto
==============================================================================
--- incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/failure/DeadlockTest.java (added)
+++ incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/failure/DeadlockTest.java Tue Sep 11 04:39:10 2007
@@ -0,0 +1,211 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.failure;
+
+import junit.framework.TestCase;
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
+import org.apache.qpid.url.URLSyntaxException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import java.util.Random;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * DeadlockTestCase:
+ * From a client requirement.
+ *
+ * The JMS Spec specifies that a Session has a single thread of control. And as such setting message listeners from a
+ * second thread is not allowed.
+ * Section 4.4.6 of the Spec states:
+ <quote>Another consequence is that a connection must be in stopped mode to set up a
+session with more than one message listener. The reason is that when a
+connection is actively delivering messages, once the first message listener for a
+session has been registered, the session is now controlled by the thread of
+control that delivers messages to it. At this point a client thread of control
+cannot be used to further configure the session.</quote>
+ *
+ * It, however, does not specified what we should do in the case. it only states:
+ <quote>Once a connection has been started, all its sessions with a registered message
+listener are dedicated to the thread of control that delivers messages to them. It
+is erroneous for client code to use such a session from another thread of
+control. The only exception to this is the use of the session or connection close
+method.</quote>
+ *
+ * While it may be erroneous the causing a Deadlock is not a very satisfactory solution. This test ensures that we do
+ * no do this. There is no technical reason we cannot currently allow the setting of a messageListener on a new consumer.
+ * The only caveate is due to QPID-577 there is likely to be temporary message 'loss'. As they are stuck on the internal
+ * _synchronousQueue pending a synchronous receive.
+ *
+ */
+public class DeadlockTest extends TestCase
+{
+ private static final Logger _logger = LoggerFactory.getLogger(DeadlockTest.class);
+
+
+ public static final String QPID_BROKER_CONNECTION_PROPERTY = "QPIDBROKER";
+
+ private String topic1 = "TEST.DeadLock1.TMP";
+ private String topic2 = "TEST.DeadLock2.TMP";
+
+ private Session sess;
+
+ private Semaphore s = new Semaphore(0);
+ private final String LOCAL = "tcp://localhost:5670";
+ private final String VM = "vm://:1";
+
+ private String BROKER = VM;
+
+ String connectionString = System.getProperty(QPID_BROKER_CONNECTION_PROPERTY,
+ "amqp://guest:guest@/test?brokerlist='" + BROKER + "'");
+
+
+ public void setUp() throws AMQVMBrokerCreationException
+ {
+ if (BROKER.equals("vm://:1"))
+ {
+ TransportConnection.createVMBroker(1);
+ }
+ }
+
+ public void tearDown() throws AMQVMBrokerCreationException
+ {
+ if (BROKER.equals("vm://:1"))
+ {
+ TransportConnection.killAllVMBrokers();
+ }
+ }
+
+ public class EmptyMessageListener implements javax.jms.MessageListener
+ {
+ public void onMessage(Message message)
+ {
+ // do nothing
+ }
+ }
+
+ public void setSessionListener(String topic, javax.jms.MessageListener listener)
+ {
+ try
+ {
+ Topic jmsTopic = sess.createTopic(topic);
+ MessageConsumer subscriber = sess.createConsumer(jmsTopic);
+ subscriber.setMessageListener(listener);
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace();
+ fail("Caught JMSException");
+ }
+ }
+
+ public class TestMessageListener implements javax.jms.MessageListener
+ {
+ public Random r = new Random();
+
+ public void onMessage(Message message)
+ {
+ if (r.nextBoolean())
+ {
+ setSessionListener(topic2, new EmptyMessageListener());
+ }
+ }
+
+ }
+
+ public void testDeadlock() throws InterruptedException, URLSyntaxException, JMSException
+ {
+ // in order to trigger the deadlock we need to
+ // set a message listener from one thread
+ // whilst receiving a message on another thread and on that thread also setting (the same?) message listener
+ AMQConnectionFactory acf = new AMQConnectionFactory(connectionString);
+ Connection conn = acf.createConnection();
+ conn.start();
+ sess = conn.createSession(false, org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
+ setSessionListener(topic1, new TestMessageListener());
+
+
+ Thread th = new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ Topic jmsTopic = sess.createTopic(topic1);
+ MessageProducer producer = sess.createProducer(jmsTopic);
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ Random r = new Random();
+ long end = System.currentTimeMillis() + 2000;
+ while (end - System.currentTimeMillis() > 0)
+ {
+ if (r.nextBoolean())
+ {
+ _logger.info("***************** send message");
+ Message jmsMessage = sess.createTextMessage("");
+ producer.send(jmsMessage);
+ }
+ else
+ {
+ _logger.info("***************** set session listener");
+ setSessionListener(topic2, new EmptyMessageListener());
+ }
+ Thread.yield();
+ }
+ _logger.info("done sends");
+ s.release();
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace();
+ fail("Caught JMSException");
+ }
+ }
+ };
+ th.setDaemon(true);
+ th.setName("testDeadlock");
+ th.start();
+
+ boolean success = s.tryAcquire(1, 4, TimeUnit.SECONDS);
+
+ // if we failed, closing the connection will just hang the test case.
+ if (success)
+ {
+ conn.close();
+ }
+
+ if (!success)
+ {
+ fail("Deadlock ocurred");
+ }
+ }
+}
Propchange: incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/failure/DeadlockTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/failure/DeadlockTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date