You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/02/16 00:23:52 UTC
svn commit: r508235 - in /incubator/qpid/trunk/qpid/java:
broker/src/main/java/org/apache/qpid/server/
broker/src/main/java/org/apache/qpid/server/ack/
broker/src/main/java/org/apache/qpid/server/handler/
broker/src/main/java/org/apache/qpid/server/que...
Author: rgodfrey
Date: Thu Feb 15 15:23:48 2007
New Revision: 508235
URL: http://svn.apache.org/viewvc?view=rev&rev=508235
Log:
QPID-366 : Reference counting not being decremented correctly and other persistence issues
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/url/URLHelper.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=508235&r1=508234&r2=508235
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Thu Feb 15 15:23:48 2007
@@ -43,6 +43,7 @@
import org.apache.qpid.server.exchange.MessageRouter;
import org.apache.qpid.server.exchange.NoRouteException;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MessageHandleFactory;
@@ -112,7 +113,7 @@
* A context used by the message store enabling it to track context for a given channel even across
* thread boundaries
*/
- private final StoreContext _storeContext = new StoreContext();
+ private final StoreContext _storeContext;
private final List<RequiredDeliveryException> _returnMessages = new LinkedList<RequiredDeliveryException>();
@@ -120,12 +121,16 @@
private Set<Long> _browsedAcks = new HashSet<Long>();
+ private final AMQProtocolSession _session;
- public AMQChannel(int channelId, MessageStore messageStore, MessageRouter exchanges)
+
+ public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore, MessageRouter exchanges)
throws AMQException
{
+ _session = session;
_channelId = channelId;
+ _storeContext = new StoreContext("Session: " + session.getClientIdentifier() + "; channel: " + channelId);
_prefetch_HighWaterMark = DEFAULT_PREFETCH;
_prefetch_LowWaterMark = _prefetch_HighWaterMark / 2;
_messageStore = messageStore;
@@ -338,7 +343,8 @@
_txnContext.rollback();
unsubscribeAllConsumers(session);
requeue();
- _txnContext.commit();
+ _txnContext.commit();
+
}
private void unsubscribeAllConsumers(AMQProtocolSession session) throws AMQException
@@ -386,8 +392,10 @@
_txnContext.deliver(unacked.message, unacked.queue);
}
}
+
}
+
/**
* Called to resend all outstanding unacknowledged messages to this same channel.
*/
@@ -403,7 +411,7 @@
AMQShortString consumerTag = message.consumerTag;
AMQMessage msg = message.message;
msg.setRedelivered(true);
- if((consumerTag != null) && _consumerTag2QueueMap.containsKey(consumerTag))
+ if((consumerTag != null) && _consumerTag2QueueMap.containsKey(consumerTag) && !isSuspended())
{
msg.writeDeliver(session, _channelId, deliveryTag, consumerTag);
}
@@ -417,6 +425,7 @@
msgToRequeue.add(message);
}
}
+
// false means continue processing
return false;
}
@@ -430,6 +439,7 @@
{
_txnContext.deliver(message.message, message.queue);
_unacknowledgedMessageMap.remove(message.deliveryTag);
+ message.message.decrementReference(_storeContext);
}
}
@@ -559,6 +569,8 @@
public void rollback() throws AMQException
{
_txnContext.rollback();
+
+
}
public String toString()
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java?view=diff&rev=508235&r1=508234&r2=508235
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java Thu Feb 15 15:23:48 2007
@@ -100,6 +100,7 @@
//make persistent changes, i.e. dequeue and decrementReference
for (UnacknowledgedMessage msg : _unacked)
{
+ msg.restoreTransientMessageData();
msg.discard(storeContext);
}
}
@@ -112,6 +113,7 @@
//in memory (persistent changes will be rolled back by store)
for (UnacknowledgedMessage msg : _unacked)
{
+ msg.clearTransientMessageData();
msg.message.incrementReference();
}
}
@@ -120,6 +122,11 @@
{
//remove the unacked messages from the channels map
_map.remove(_unacked);
+ for (UnacknowledgedMessage msg : _unacked)
+ {
+ msg.clearTransientMessageData();
+ }
+
}
public void rollback(StoreContext storeContext)
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java?view=diff&rev=508235&r1=508234&r2=508235
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java Thu Feb 15 15:23:48 2007
@@ -50,5 +50,15 @@
}
message.decrementReference(storeContext);
}
+
+ public void restoreTransientMessageData() throws AMQException
+ {
+ message.restoreTransientMessageData();
+ }
+
+ public void clearTransientMessageData()
+ {
+ message.clearTransientMessageData();
+ }
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java?view=diff&rev=508235&r1=508234&r2=508235
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java Thu Feb 15 15:23:48 2007
@@ -49,7 +49,7 @@
AMQProtocolSession session = stateManager.getProtocolSession();
VirtualHost virtualHost = session.getVirtualHost();
- final AMQChannel channel = new AMQChannel(evt.getChannelId(), virtualHost.getMessageStore(),
+ final AMQChannel channel = new AMQChannel(session,evt.getChannelId(), virtualHost.getMessageStore(),
virtualHost.getExchangeRegistry());
session.addChannel(channel);
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=508235&r1=508234&r2=508235
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Thu Feb 15 15:23:48 2007
@@ -20,10 +20,7 @@
*/
package org.apache.qpid.server.queue;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -111,7 +108,7 @@
{
try
{
- return _index < _messageHandle.getBodyCount(_messageId) - 1;
+ return _index < _messageHandle.getBodyCount(getStoreContext(),_messageId) - 1;
}
catch (AMQException e)
{
@@ -124,7 +121,7 @@
{
try
{
- ContentBody cb = _messageHandle.getContentBody(_messageId, ++_index);
+ ContentBody cb = _messageHandle.getContentBody(getStoreContext(),_messageId, ++_index);
return ContentBody.createAMQFrame(_channel, cb);
}
catch (AMQException e)
@@ -141,6 +138,11 @@
}
}
+ private StoreContext getStoreContext()
+ {
+ return _txnContext.getStoreContext();
+ }
+
private class BodyContentIterator implements Iterator<ContentBody>
{
@@ -150,7 +152,7 @@
{
try
{
- return _index < _messageHandle.getBodyCount(_messageId) - 1;
+ return _index < _messageHandle.getBodyCount(getStoreContext(),_messageId) - 1;
}
catch (AMQException e)
{
@@ -163,7 +165,7 @@
{
try
{
- return _messageHandle.getContentBody(_messageId, ++_index);
+ return _messageHandle.getContentBody(getStoreContext(),_messageId, ++_index);
}
catch (AMQException e)
{
@@ -201,10 +203,11 @@
* @param factory
* @throws AMQException
*/
- public AMQMessage(Long messageId, MessageStore store, MessageHandleFactory factory) throws AMQException
+ public AMQMessage(Long messageId, MessageStore store, MessageHandleFactory factory, TransactionalContext txnConext) throws AMQException
{
_messageId = messageId;
_messageHandle = factory.createMessageHandle(messageId, store, true);
+ _txnContext = txnConext;
_transientMessageData = null;
}
@@ -276,7 +279,7 @@
}
else
{
- return _messageHandle.getContentHeaderBody(_messageId);
+ return _messageHandle.getContentHeaderBody(getStoreContext(),_messageId);
}
}
@@ -342,14 +345,16 @@
_referenceCount.incrementAndGet();
if (_log.isDebugEnabled())
{
- _log.debug("Ref count on message " + _messageId + " incremented to " + _referenceCount);
+
+ _log.debug("Ref count on message " + _messageId + " incremented to " + _referenceCount + " " + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0,4));
+
}
}
/**
* Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the
* message store.
- *
+ *
* @throws MessageCleanupException when an attempt was made to remove the message from the message store and that
* failed
*/
@@ -365,7 +370,9 @@
{
if (_log.isDebugEnabled())
{
- _log.debug("Ref count on message " + _messageId + " is zero; removing message");
+ _log.debug("Ref count on message " + _messageId + " is zero; removing message" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0,4));
+
+
}
// must check if the handle is null since there may be cases where we decide to throw away a message
@@ -386,7 +393,7 @@
{
if (_log.isDebugEnabled())
{
- _log.debug("Ref count is now " + _referenceCount + " for message id " + _messageId);
+ _log.debug("Ref count is now " + _referenceCount + " for message id " + _messageId+ "\n" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0,4));
if (_referenceCount.get() < 0)
{
Thread.dumpStack();
@@ -475,7 +482,7 @@
}
else
{
- return _messageHandle.isPersistent(_messageId);
+ return _messageHandle.isPersistent(getStoreContext(),_messageId);
}
}
@@ -504,7 +511,7 @@
}
else
{
- pb = _messageHandle.getPublishBody(_messageId);
+ pb = _messageHandle.getPublishBody(getStoreContext(),_messageId);
}
return pb;
}
@@ -541,7 +548,7 @@
List<AMQQueue> destinationQueues = _transientMessageData.getDestinationQueues();
if (_log.isDebugEnabled())
{
- _log.debug("Delivering message " + _messageId);
+ _log.debug("Delivering message " + _messageId + " to " + destinationQueues);
}
try
{
@@ -575,7 +582,7 @@
AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
getContentHeaderBody());
- final int bodyCount = _messageHandle.getBodyCount(_messageId);
+ final int bodyCount = _messageHandle.getBodyCount(getStoreContext(),_messageId);
if(bodyCount == 0)
{
SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
@@ -591,7 +598,7 @@
// Optimise the case where we have a single content body. In that case we create a composite block
// so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
//
- ContentBody cb = _messageHandle.getContentBody(_messageId, 0);
+ ContentBody cb = _messageHandle.getContentBody(getStoreContext(),_messageId, 0);
AMQDataBlock firstContentBody = ContentBody.createAMQFrame(channelId, cb);
AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
@@ -603,7 +610,7 @@
//
for(int i = 1; i < bodyCount; i++)
{
- cb = _messageHandle.getContentBody(_messageId, i);
+ cb = _messageHandle.getContentBody(getStoreContext(),_messageId, i);
protocolSession.writeFrame(ContentBody.createAMQFrame(channelId, cb));
}
@@ -619,7 +626,7 @@
AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
getContentHeaderBody());
- final int bodyCount = _messageHandle.getBodyCount(_messageId);
+ final int bodyCount = _messageHandle.getBodyCount(getStoreContext(),_messageId);
if(bodyCount == 0)
{
SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
@@ -634,7 +641,7 @@
// Optimise the case where we have a single content body. In that case we create a composite block
// so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
//
- ContentBody cb = _messageHandle.getContentBody(_messageId, 0);
+ ContentBody cb = _messageHandle.getContentBody(getStoreContext(),_messageId, 0);
AMQDataBlock firstContentBody = ContentBody.createAMQFrame(channelId, cb);
AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
@@ -646,7 +653,7 @@
//
for(int i = 1; i < bodyCount; i++)
{
- cb = _messageHandle.getContentBody(_messageId, i);
+ cb = _messageHandle.getContentBody(getStoreContext(),_messageId, i);
protocolSession.writeFrame(ContentBody.createAMQFrame(channelId, cb));
}
@@ -749,11 +756,28 @@
}
catch (AMQException e)
{
- _log.error(e);
+ _log.error(e.toString(),e);
return 0;
}
}
+
+
+
+ public void restoreTransientMessageData() throws AMQException
+ {
+ TransientMessageData transientMessageData = new TransientMessageData();
+ transientMessageData.setPublishBody(getPublishBody());
+ transientMessageData.setContentHeaderBody(getContentHeaderBody());
+ transientMessageData.addBodyLength(getContentHeaderBody().getSize());
+ _transientMessageData = transientMessageData;
+ }
+
+
+ public void clearTransientMessageData()
+ {
+ _transientMessageData = null;
+ }
public String toString()
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java?view=diff&rev=508235&r1=508234&r2=508235
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java Thu Feb 15 15:23:48 2007
@@ -35,17 +35,17 @@
*/
public interface AMQMessageHandle
{
- ContentHeaderBody getContentHeaderBody(Long messageId) throws AMQException;
+ ContentHeaderBody getContentHeaderBody(StoreContext context, Long messageId) throws AMQException;
/**
* @return the number of body frames associated with this message
*/
- int getBodyCount(Long messageId) throws AMQException;
+ int getBodyCount(StoreContext context, Long messageId) throws AMQException;
/**
* @return the size of the body
*/
- long getBodySize(Long messageId) throws AMQException;
+ long getBodySize(StoreContext context, Long messageId) throws AMQException;
/**
* Get a particular content body
@@ -53,17 +53,17 @@
* @return a content body
* @throws IllegalArgumentException if the index is invalid
*/
- ContentBody getContentBody(Long messageId, int index) throws IllegalArgumentException, AMQException;
+ ContentBody getContentBody(StoreContext context, Long messageId, int index) throws IllegalArgumentException, AMQException;
void addContentBodyFrame(StoreContext storeContext, Long messageId, ContentBody contentBody, boolean isLastContentBody) throws AMQException;
- BasicPublishBody getPublishBody(Long messageId) throws AMQException;
+ BasicPublishBody getPublishBody(StoreContext context, Long messageId) throws AMQException;
boolean isRedelivered();
void setRedelivered(boolean redelivered);
- boolean isPersistent(Long messageId) throws AMQException;
+ boolean isPersistent(StoreContext context, Long messageId) throws AMQException;
void setPublishAndContentHeaderBody(StoreContext storeContext, Long messageId, BasicPublishBody publishBody,
ContentHeaderBody contentHeaderBody)
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java?view=diff&rev=508235&r1=508234&r2=508235
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java Thu Feb 15 15:23:48 2007
@@ -49,22 +49,22 @@
{
}
- public ContentHeaderBody getContentHeaderBody(Long messageId) throws AMQException
+ public ContentHeaderBody getContentHeaderBody(StoreContext context, Long messageId) throws AMQException
{
return _contentHeaderBody;
}
- public int getBodyCount(Long messageId)
+ public int getBodyCount(StoreContext context, Long messageId)
{
return _contentBodies.size();
}
- public long getBodySize(Long messageId) throws AMQException
+ public long getBodySize(StoreContext context, Long messageId) throws AMQException
{
- return getContentHeaderBody(messageId).bodySize;
+ return getContentHeaderBody(context, messageId).bodySize;
}
- public ContentBody getContentBody(Long messageId, int index) throws AMQException, IllegalArgumentException
+ public ContentBody getContentBody(StoreContext context, Long messageId, int index) throws AMQException, IllegalArgumentException
{
if (index > _contentBodies.size() - 1)
{
@@ -80,7 +80,7 @@
_contentBodies.add(contentBody);
}
- public BasicPublishBody getPublishBody(Long messageId) throws AMQException
+ public BasicPublishBody getPublishBody(StoreContext context, Long messageId) throws AMQException
{
return _publishBody;
}
@@ -96,10 +96,10 @@
_redelivered = redelivered;
}
- public boolean isPersistent(Long messageId) throws AMQException
+ public boolean isPersistent(StoreContext context, Long messageId) throws AMQException
{
//todo remove literal values to a constant file such as AMQConstants in common
- ContentHeaderBody chb = getContentHeaderBody(messageId);
+ ContentHeaderBody chb = getContentHeaderBody(context, messageId);
return chb.properties instanceof BasicContentHeaderProperties &&
((BasicContentHeaderProperties) chb.properties).getDeliveryMode() == 2;
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?view=diff&rev=508235&r1=508234&r2=508235
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java Thu Feb 15 15:23:48 2007
@@ -267,9 +267,11 @@
if (_acks)
{
channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
+ msg.decrementReference(storeContext);
}
msg.writeDeliver(protocolSession, channel.getChannelId(), deliveryTag, consumerTag);
+
}
}
finally
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java?view=diff&rev=508235&r1=508234&r2=508235
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java Thu Feb 15 15:23:48 2007
@@ -56,21 +56,21 @@
_messageStore = messageStore;
}
- public ContentHeaderBody getContentHeaderBody(Long messageId) throws AMQException
+ public ContentHeaderBody getContentHeaderBody(StoreContext context, Long messageId) throws AMQException
{
ContentHeaderBody chb = (_contentHeaderBody != null ? _contentHeaderBody.get() : null);
if (chb == null)
{
- MessageMetaData mmd = loadMessageMetaData(messageId);
+ MessageMetaData mmd = loadMessageMetaData(context, messageId);
chb = mmd.getContentHeaderBody();
}
return chb;
}
- private MessageMetaData loadMessageMetaData(Long messageId)
+ private MessageMetaData loadMessageMetaData(StoreContext context, Long messageId)
throws AMQException
{
- MessageMetaData mmd = _messageStore.getMessageMetaData(messageId);
+ MessageMetaData mmd = _messageStore.getMessageMetaData(context, messageId);
populateFromMessageMetaData(mmd);
return mmd;
}
@@ -82,11 +82,11 @@
_publishBody = new WeakReference<BasicPublishBody>(mmd.getPublishBody());
}
- public int getBodyCount(Long messageId) throws AMQException
+ public int getBodyCount(StoreContext context, Long messageId) throws AMQException
{
if (_contentBodies == null)
{
- MessageMetaData mmd = _messageStore.getMessageMetaData(messageId);
+ MessageMetaData mmd = _messageStore.getMessageMetaData(context, messageId);
int chunkCount = mmd.getContentChunkCount();
_contentBodies = new ArrayList<WeakReference<ContentBody>>(chunkCount);
for (int i = 0; i < chunkCount; i++)
@@ -97,12 +97,12 @@
return _contentBodies.size();
}
- public long getBodySize(Long messageId) throws AMQException
+ public long getBodySize(StoreContext context, Long messageId) throws AMQException
{
- return getContentHeaderBody(messageId).bodySize;
+ return getContentHeaderBody(context, messageId).bodySize;
}
- public ContentBody getContentBody(Long messageId, int index) throws AMQException, IllegalArgumentException
+ public ContentBody getContentBody(StoreContext context, Long messageId, int index) throws AMQException, IllegalArgumentException
{
if (index > _contentBodies.size() - 1)
{
@@ -113,7 +113,7 @@
ContentBody cb = wr.get();
if (cb == null)
{
- cb = _messageStore.getContentBodyChunk(messageId, index);
+ cb = _messageStore.getContentBodyChunk(context, messageId, index);
_contentBodies.set(index, new WeakReference<ContentBody>(cb));
}
return cb;
@@ -145,12 +145,12 @@
_messageStore.storeContentBodyChunk(storeContext, messageId, _contentBodies.size() - 1, contentBody, isLastContentBody);
}
- public BasicPublishBody getPublishBody(Long messageId) throws AMQException
+ public BasicPublishBody getPublishBody(StoreContext context, Long messageId) throws AMQException
{
BasicPublishBody bpb = (_publishBody != null ? _publishBody.get() : null);
if (bpb == null)
{
- MessageMetaData mmd = loadMessageMetaData(messageId);
+ MessageMetaData mmd = loadMessageMetaData(context, messageId);
bpb = mmd.getPublishBody();
}
@@ -167,10 +167,10 @@
_redelivered = redelivered;
}
- public boolean isPersistent(Long messageId) throws AMQException
+ public boolean isPersistent(StoreContext context, Long messageId) throws AMQException
{
//todo remove literal values to a constant file such as AMQConstants in common
- ContentHeaderBody chb = getContentHeaderBody(messageId);
+ ContentHeaderBody chb = getContentHeaderBody(context, messageId);
return chb.properties instanceof BasicContentHeaderProperties &&
((BasicContentHeaderProperties) chb.properties).getDeliveryMode() == 2;
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java?view=diff&rev=508235&r1=508234&r2=508235
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java Thu Feb 15 15:23:48 2007
@@ -174,12 +174,12 @@
_metaDataMap.put(messageId, messageMetaData);
}
- public MessageMetaData getMessageMetaData(Long messageId) throws AMQException
+ public MessageMetaData getMessageMetaData(StoreContext context,Long messageId) throws AMQException
{
return _metaDataMap.get(messageId);
}
- public ContentBody getContentBodyChunk(Long messageId, int index) throws AMQException
+ public ContentBody getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
{
List<ContentBody> bodyList = _contentBodyMap.get(messageId);
return bodyList.get(index);
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java?view=diff&rev=508235&r1=508234&r2=508235
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java Thu Feb 15 15:23:48 2007
@@ -84,8 +84,8 @@
void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException;
- MessageMetaData getMessageMetaData(Long messageId) throws AMQException;
+ MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException;
- ContentBody getContentBodyChunk(Long messageId, int index) throws AMQException;
+ ContentBody getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException;
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java?view=diff&rev=508235&r1=508234&r2=508235
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java Thu Feb 15 15:23:48 2007
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.server.store;
+import org.apache.log4j.Logger;
+
+
/**
* A context that the store can use to associate with a transactional context. For example, it could store
* some kind of txn id.
@@ -28,8 +31,22 @@
*/
public class StoreContext
{
+
+ private static final Logger _logger = Logger.getLogger(StoreContext.class);
+
+ private String _name;
private Object _payload;
+ public StoreContext()
+ {
+ _name = super.toString();
+ }
+
+ public StoreContext(String name)
+ {
+ _name = name;
+ }
+
public Object getPayload()
{
return _payload;
@@ -37,6 +54,7 @@
public void setPayload(Object payload)
{
+ _logger.debug("["+_name+"] Setting payload: " + payload);
_payload = payload;
}
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java?view=diff&rev=508235&r1=508234&r2=508235
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java Thu Feb 15 15:23:48 2007
@@ -168,7 +168,7 @@
{
if (_log.isDebugEnabled())
{
- _log.debug("Starting transaction on message store");
+ _log.debug("Starting transaction on message store: " + this);
}
_messageStore.beginTran(_storeContext);
_inTran = true;
@@ -179,7 +179,7 @@
{
if (_log.isDebugEnabled())
{
- _log.debug("Committing transactional context");
+ _log.debug("Committing transactional context: " + this);
}
if (_ackOp != null)
{
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java?view=diff&rev=508235&r1=508234&r2=508235
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java Thu Feb 15 15:23:48 2007
@@ -75,7 +75,7 @@
}
else
{
- URLHelper.parseError(0, transport.length(), "Unknown transport", url);
+ throw URLHelper.parseError(0, transport.length(), "Unknown transport", url);
}
}
}
@@ -89,7 +89,7 @@
if (transport == null)
{
- URLHelper.parseError(-1, "Unknown transport:'" + transport + "'" +
+ throw URLHelper.parseError(-1, "Unknown transport:'" + transport + "'" +
" In broker URL:'" + url + "' Format: " + URL_FORMAT_EXAMPLE, "");
}
@@ -144,7 +144,7 @@
}
else
{
- URLHelper.parseError(connection.toString().indexOf(connection.getAuthority()) + end - 1,
+ throw URLHelper.parseError(connection.toString().indexOf(connection.getAuthority()) + end - 1,
"Illegal character in port number", connection.toString());
}
@@ -172,7 +172,7 @@
throw(URLSyntaxException) uris;
}
- URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput());
+ throw URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput());
}
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java?view=diff&rev=508235&r1=508234&r2=508235
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java Thu Feb 15 15:23:48 2007
@@ -68,7 +68,7 @@
String uid = AMQConnectionFactory.getUniqueClientID();
if (uid == null)
{
- URLHelper.parseError(-1, "Client Name not specified", fullURL);
+ throw URLHelper.parseError(-1, "Client Name not specified", fullURL);
}
else
{
@@ -106,7 +106,7 @@
if (userInfo == null)
{
- URLHelper.parseError(AMQ_PROTOCOL.length() + 3,
+ throw URLHelper.parseError(AMQ_PROTOCOL.length() + 3,
"User information not found on url", fullURL);
}
else
@@ -126,11 +126,11 @@
int testIndex = start + authLength;
if (testIndex < fullURL.length() && fullURL.charAt(testIndex) == '?')
{
- URLHelper.parseError(start, testIndex - start, "Virtual host found", fullURL);
+ throw URLHelper.parseError(start, testIndex - start, "Virtual host found", fullURL);
}
else
{
- URLHelper.parseError(-1, "Virtual host not specified", fullURL);
+ throw URLHelper.parseError(-1, "Virtual host not specified", fullURL);
}
}
@@ -155,17 +155,17 @@
if (slash == -1)
{
- URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput());
+ throw URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput());
}
else
{
if (slash != 0 && fullURL.charAt(slash - 1) == ':')
{
- URLHelper.parseError(slash - 2, fullURL.indexOf('?') - slash + 2, "Virtual host looks like a windows path, forward slash not allowed in URL", fullURL);
+ throw URLHelper.parseError(slash - 2, fullURL.indexOf('?') - slash + 2, "Virtual host looks like a windows path, forward slash not allowed in URL", fullURL);
}
else
{
- URLHelper.parseError(slash, "Forward slash not allowed in URL", fullURL);
+ throw URLHelper.parseError(slash, "Forward slash not allowed in URL", fullURL);
}
}
@@ -180,7 +180,7 @@
if (colonIndex == -1)
{
- URLHelper.parseError(AMQ_PROTOCOL.length() + 3, userinfo.length(),
+ throw URLHelper.parseError(AMQ_PROTOCOL.length() + 3, userinfo.length(),
"Null password in user information not allowed.", _url);
}
else
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java?view=diff&rev=508235&r1=508234&r2=508235
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java Thu Feb 15 15:23:48 2007
@@ -76,7 +76,7 @@
_isExclusive = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_EXCLUSIVE));
_isAutoDelete = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_AUTODELETE));
_isDurable = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_DURABLE));
- _queueName = new AMQShortString(binding.getQueueName());
+ _queueName = binding.getQueueName() == null ? null : new AMQShortString(binding.getQueueName());
}
protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString destinationName, AMQShortString queueName)
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java?view=diff&rev=508235&r1=508234&r2=508235
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java Thu Feb 15 15:23:48 2007
@@ -28,6 +28,7 @@
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
+import org.apache.qpid.client.CustomJMSXProperty;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
@@ -42,7 +43,7 @@
/**
* This constant represents the name of a property that is set when the message payload is null.
*/
- private static final AMQShortString PAYLOAD_NULL_PROPERTY = new AMQShortString("JMS_QPID_NULL");
+ private static final AMQShortString PAYLOAD_NULL_PROPERTY = CustomJMSXProperty.JMS_AMQP_NULL.getShortStringName();
private static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8");
public JMSTextMessage() throws JMSException
Modified: incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java?view=diff&rev=508235&r1=508234&r2=508235
==============================================================================
--- incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java (original)
+++ incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java Thu Feb 15 15:23:48 2007
@@ -107,7 +107,7 @@
public OneUseChannel(int channelId, VirtualHost virtualHost)
throws AMQException
{
- super(channelId,
+ super(ClusteredProtocolSession.this,channelId,
virtualHost.getMessageStore(),
virtualHost.getExchangeRegistry());
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java?view=diff&rev=508235&r1=508234&r2=508235
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java Thu Feb 15 15:23:48 2007
@@ -26,9 +26,12 @@
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.log4j.Logger;
public class AMQBindingURL implements BindingURL
{
+ private static final Logger _logger = Logger.getLogger(AMQBindingURL.class);
+
String _url;
AMQShortString _exchangeClass;
AMQShortString _exchangeName;
@@ -41,7 +44,7 @@
{
//format:
// <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
-
+ _logger.debug("Parsing URL: " + url);
_url = url;
_options = new HashMap<String, String>();
@@ -73,17 +76,19 @@
if (exchangeName == null)
{
- URLHelper.parseError(-1, "Exchange Name not specified.", _url);
+ throw URLHelper.parseError(-1, "Exchange Name not specified.", _url);
}
else
{
setExchangeName(exchangeName);
}
+ String queueName;
+
if (connection.getPath() == null ||
connection.getPath().equals(""))
{
- URLHelper.parseError(_url.indexOf(_exchangeName.toString()) + _exchangeName.length(),
+ throw URLHelper.parseError(_url.indexOf(_exchangeName.toString()) + _exchangeName.length(),
"Destination or Queue requried", _url);
}
else
@@ -91,7 +96,7 @@
int slash = connection.getPath().indexOf("/", 1);
if (slash == -1)
{
- URLHelper.parseError(_url.indexOf(_exchangeName.toString()) + _exchangeName.length(),
+ throw URLHelper.parseError(_url.indexOf(_exchangeName.toString()) + _exchangeName.length(),
"Destination requried", _url);
}
else
@@ -99,7 +104,10 @@
String path = connection.getPath();
setDestinationName(path.substring(1, slash));
- setQueueName(path.substring(slash + 1));
+ // We don't set queueName yet as the actual value we use depends on options set
+ // when we are dealing with durable subscriptions
+
+ queueName = path.substring(slash + 1);
}
}
@@ -108,14 +116,19 @@
processOptions();
+ // We can now call setQueueName as the URL is full parsed.
+
+ setQueueName(queueName);
+
//Fragment is #string (not used)
//System.out.println(connection.getFragment());
+ _logger.debug("URL Parsed: " + this);
}
catch (URISyntaxException uris)
{
- URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput());
+ throw URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput());
}
}
@@ -125,7 +138,7 @@
setExchangeClass(new AMQShortString(exchangeClass));
}
- private void setQueueName(String name)
+ private void setQueueName(String name) throws URLSyntaxException
{
setQueueName(new AMQShortString(name));
}
@@ -155,8 +168,9 @@
return _exchangeClass;
}
- public void setExchangeClass(AMQShortString exchangeClass)
+ private void setExchangeClass(AMQShortString exchangeClass)
{
+
_exchangeClass = exchangeClass;
}
@@ -165,7 +179,7 @@
return _exchangeName;
}
- public void setExchangeName(AMQShortString name)
+ private void setExchangeName(AMQShortString name)
{
_exchangeName = name;
@@ -180,40 +194,43 @@
return _destinationName;
}
- public void setDestinationName(AMQShortString name)
+ private void setDestinationName(AMQShortString name)
{
_destinationName = name;
}
public AMQShortString getQueueName()
{
+ return _queueName;
+ }
+
+ public void setQueueName(AMQShortString name) throws URLSyntaxException
+ {
if (_exchangeClass.equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS))
{
if (Boolean.parseBoolean(getOption(OPTION_DURABLE)))
{
if (containsOption(BindingURL.OPTION_CLIENTID) && containsOption(BindingURL.OPTION_SUBSCRIPTION))
{
- return new AMQShortString(getOption(BindingURL.OPTION_CLIENTID + ":" + BindingURL.OPTION_SUBSCRIPTION));
+ _queueName = new AMQShortString(getOption(BindingURL.OPTION_CLIENTID + ":" + BindingURL.OPTION_SUBSCRIPTION));
}
else
{
- return getDestinationName();
+ throw URLHelper.parseError(-1, "Durable subscription must have values for " + BindingURL.OPTION_CLIENTID + " and " + BindingURL.OPTION_SUBSCRIPTION + ".", _url);
+
}
}
else
{
- return getDestinationName();
+ _queueName = null;
}
}
else
{
- return _queueName;
+ _queueName = name;
}
- }
- public void setQueueName(AMQShortString name)
- {
- _queueName = name;
+
}
public String getOption(String key)
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java?view=diff&rev=508235&r1=508234&r2=508235
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java Thu Feb 15 15:23:48 2007
@@ -40,29 +40,17 @@
AMQShortString getExchangeClass();
- void setExchangeClass(AMQShortString name);
-
AMQShortString getExchangeName();
- void setExchangeName(AMQShortString name);
-
AMQShortString getDestinationName();
- void setDestinationName(AMQShortString name);
-
AMQShortString getQueueName();
- void setQueueName(AMQShortString name);
-
String getOption(String key);
- void setOption(String key, String value);
-
boolean containsOption(String key);
AMQShortString getRoutingKey();
-
- void setRoutingKey(AMQShortString key);
String toString();
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/url/URLHelper.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/url/URLHelper.java?view=diff&rev=508235&r1=508234&r2=508235
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/url/URLHelper.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/url/URLHelper.java Thu Feb 15 15:23:48 2007
@@ -114,11 +114,11 @@
if (sepIndex >= options.length() || sepIndex == 0)
{
- parseError(valueIndex, "Unterminated option", options);
+ throw parseError(valueIndex, "Unterminated option", options);
}
else
{
- parseError(sepIndex, "Unterminated option. Possible illegal option separator:'" +
+ throw parseError(sepIndex, "Unterminated option. Possible illegal option separator:'" +
options.charAt(sepIndex) + "'", options);
}
}
@@ -136,14 +136,14 @@
}
- public static void parseError(int index, String error, String url) throws URLSyntaxException
+ public static URLSyntaxException parseError(int index, String error, String url)
{
- parseError(index, 1, error, url);
+ return parseError(index, 1, error, url);
}
- public static void parseError(int index, int length, String error, String url) throws URLSyntaxException
+ public static URLSyntaxException parseError(int index, int length, String error, String url)
{
- throw new URLSyntaxException(url, error, index, length);
+ return new URLSyntaxException(url, error, index, length);
}
public static String printOptions(HashMap<String, String> options)
Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java?view=diff&rev=508235&r1=508234&r2=508235
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java Thu Feb 15 15:23:48 2007
@@ -58,7 +58,7 @@
assertTrue(channelCount == 1);
AMQQueue queue = new org.apache.qpid.server.queue.AMQQueue(new AMQShortString("testQueue_" + System.currentTimeMillis()),
false, new AMQShortString("test"), true, _virtualHost);
- AMQChannel channel = new AMQChannel(2, _messageStore, null);
+ AMQChannel channel = new AMQChannel(_protocolSession,2, _messageStore, null);
channel.setDefaultQueue(queue);
_protocolSession.addChannel(channel);
channelCount = _mbean.channels().size();
@@ -69,7 +69,7 @@
assertTrue(_mbean.getMaximumNumberOfChannels() == 1000L);
// check APIs
- AMQChannel channel3 = new AMQChannel(3, _messageStore, null);
+ AMQChannel channel3 = new AMQChannel(_protocolSession,3, _messageStore, null);
channel3.setLocalTransactional();
_protocolSession.addChannel(channel3);
_mbean.rollbackTransactions(2);
@@ -89,14 +89,14 @@
}
// check if closing of session works
- _protocolSession.addChannel(new AMQChannel(5, _messageStore, null));
+ _protocolSession.addChannel(new AMQChannel(_protocolSession,5, _messageStore, null));
_mbean.closeConnection();
try
{
channelCount = _mbean.channels().size();
assertTrue(channelCount == 0);
// session is now closed so adding another channel should throw an exception
- _protocolSession.addChannel(new AMQChannel(6, _messageStore, null));
+ _protocolSession.addChannel(new AMQChannel(_protocolSession,6, _messageStore, null));
fail();
}
catch(AMQException ex)
@@ -109,13 +109,14 @@
protected void setUp() throws Exception
{
super.setUp();
- _channel = new AMQChannel(1, _messageStore, null);
+
IApplicationRegistry appRegistry = ApplicationRegistry.getInstance();
_virtualHost = appRegistry.getVirtualHostRegistry().getVirtualHost("test");
_queueRegistry = _virtualHost.getQueueRegistry();
_exchangeRegistry = _virtualHost.getExchangeRegistry();
_mockIOSession = new MockIoSession();
_protocolSession = new AMQMinaProtocolSession(_mockIOSession, appRegistry.getVirtualHostRegistry(), new AMQCodecFactory(true));
+ _channel = new AMQChannel(_protocolSession,1, _messageStore, null);
_protocolSession.addChannel(_channel);
_mbean = (AMQProtocolSessionMBean)_protocolSession.getManagedObject();
}
Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?view=diff&rev=508235&r1=508234&r2=508235
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java Thu Feb 15 15:23:48 2007
@@ -78,8 +78,9 @@
assertFalse(mgr.hasActiveSubscribers());
assertTrue(_queueMBean.getActiveConsumerCount() == 0);
- _channel = new AMQChannel(1, _messageStore, null);
+
_protocolSession = new MockProtocolSession(_messageStore);
+ _channel = new AMQChannel(_protocolSession, 1, _messageStore, null);
_protocolSession.addChannel(_channel);
_queue.registerProtocolSession(_protocolSession, 1, new AMQShortString("test"), false, null,false,false);
Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java?view=diff&rev=508235&r1=508234&r2=508235
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java Thu Feb 15 15:23:48 2007
@@ -75,8 +75,9 @@
{
super.setUp();
_messageStore = new TestableMemoryMessageStore();
- _channel = new AMQChannel(5, _messageStore, null/*dont need exchange registry*/);
_protocolSession = new MockProtocolSession(_messageStore);
+ _channel = new AMQChannel(_protocolSession,5, _messageStore, null/*dont need exchange registry*/);
+
_protocolSession.addChannel(_channel);
_subscriptionManager = new SubscriptionSet();
_queue = new AMQQueue(new AMQShortString("myQ"), false, new AMQShortString("guest"), true, ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"), _subscriptionManager);
Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java?view=diff&rev=508235&r1=508234&r2=508235
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java Thu Feb 15 15:23:48 2007
@@ -97,12 +97,12 @@
}
- public MessageMetaData getMessageMetaData(Long messageId) throws AMQException
+ public MessageMetaData getMessageMetaData(StoreContext s,Long messageId) throws AMQException
{
return null;
}
- public ContentBody getContentBodyChunk(Long messageId, int index) throws AMQException
+ public ContentBody getContentBodyChunk(StoreContext s,Long messageId, int index) throws AMQException
{
return null;
}