You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ru...@apache.org on 2007/05/17 17:32:19 UTC
svn commit: r538968 - in /incubator/qpid/trunk/qpid: ./
java/broker/src/main/java/org/apache/qpid/server/
java/broker/src/main/java/org/apache/qpid/server/exchange/
java/broker/src/main/java/org/apache/qpid/server/queue/
java/broker/src/test/java/org/a...
Author: rupertlssmith
Date: Thu May 17 08:32:18 2007
New Revision: 538968
URL: http://svn.apache.org/viewvc?view=rev&rev=538968
Log:
Merged revisions 538084-538097,538099-538108,538110-538906,538908-538912 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2
........
r538084 | ritchiem | 2007-05-15 09:02:42 +0100 (Tue, 15 May 2007) | 1 line
QPID-466 Removed Unsupported exception from setIntProperty with STRICT_AMQP set
........
r538240 | ritchiem | 2007-05-15 17:19:01 +0100 (Tue, 15 May 2007) | 6 lines
QPID-3 Topic Matching with tests
A simple naive approach. Similar to C++ to be included for M2.
More elaborate pre-evaluated version will have to wait.
Once benchmarks have been performed we can evaluate performance advantages if any of that approach.
........
r538882 | ritchiem | 2007-05-17 13:12:34 +0100 (Thu, 17 May 2007) | 3 lines
Fix for broken CSDM message purging routine that was causing python test_get to fail.
Replaced long while control with a method call that is easier to understand and has more comments.
........
r538912 | ritchiem | 2007-05-17 14:26:25 +0100 (Thu, 17 May 2007) | 2 lines
Fixed failing python tests. The rather annoying way we unsubscribe subscribers by creating new ones was causing a problem as the closing channel had been closed before the unsubscribe call.
Java now passes all python tests
........
Added:
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
- copied, changed from r538882, incubator/qpid/branches/M2/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
Modified:
incubator/qpid/trunk/qpid/ (props changed)
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
Propchange: incubator/qpid/trunk/qpid/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=538968&r1=538967&r2=538968
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Thu May 17 08:32:18 2007
@@ -31,6 +31,7 @@
import java.util.concurrent.atomic.AtomicLong;
import org.apache.log4j.Logger;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentBody;
@@ -42,12 +43,12 @@
import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
import org.apache.qpid.server.exchange.MessageRouter;
import org.apache.qpid.server.exchange.NoRouteException;
+import org.apache.qpid.server.messageStore.MessageStore;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MessageHandleFactory;
import org.apache.qpid.server.queue.Subscription;
-import org.apache.qpid.server.messageStore.MessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.*;
@@ -59,7 +60,7 @@
private final int _channelId;
- //private boolean _transactional;
+ // private boolean _transactional;
private long _prefetch_HighWaterMark;
@@ -119,14 +120,12 @@
private Set<Long> _browsedAcks = new HashSet<Long>();
- //Why do we need this reference ? - ritchiem
+ // Why do we need this reference ? - ritchiem
private final AMQProtocolSession _session;
private boolean _closing;
-
- public AMQChannel(AMQProtocolSession session, int channelId, TransactionManager transactionManager, MessageStore messageStore, MessageRouter exchanges)
- throws
- AMQException
+ public AMQChannel(AMQProtocolSession session, int channelId, TransactionManager transactionManager,
+ MessageStore messageStore, MessageRouter exchanges) throws AMQException
{
_session = session;
_channelId = channelId;
@@ -145,7 +144,8 @@
*/
public void setLocalTransactional()
{
- _txnContext = new DistributedTransactionalContext(_transactionManager, _messageStore, _storeContext, _returnMessages);
+ _txnContext =
+ new DistributedTransactionalContext(_transactionManager, _messageStore, _storeContext, _returnMessages);
}
public boolean isTransactional()
@@ -176,7 +176,6 @@
return _prefetchSize;
}
-
public void setPrefetchSize(long prefetchSize)
{
_prefetchSize = prefetchSize;
@@ -202,31 +201,26 @@
_prefetch_HighWaterMark = prefetchCount;
}
-
- public void setPublishFrame(MessagePublishInfo info, AMQProtocolSession publisher)
- throws
- AMQException
+ public void setPublishFrame(MessagePublishInfo info, AMQProtocolSession publisher) throws AMQException
{
-
- _currentMessage = new AMQMessage(_messageStore.getNewMessageId(), info,
- _txnContext);
+ _currentMessage = new AMQMessage(_messageStore.getNewMessageId(), info, _txnContext);
_currentMessage.setPublisher(publisher);
}
- public void publishContentHeader(ContentHeaderBody contentHeaderBody)
- throws
- AMQException
+ public void publishContentHeader(ContentHeaderBody contentHeaderBody) throws AMQException
{
if (_currentMessage == null)
{
throw new AMQException("Received content header without previously receiving a BasicPublish frame");
- } else
+ }
+ else
{
if (_log.isTraceEnabled())
{
_log.trace(debugIdentity() + "Content header received on channel " + _channelId);
}
+
_currentMessage.setContentHeaderBody(contentHeaderBody);
_currentMessage.setExpiration();
@@ -241,9 +235,7 @@
}
}
- public void publishContentBody(ContentBody contentBody, AMQProtocolSession protocolSession)
- throws
- AMQException
+ public void publishContentBody(ContentBody contentBody, AMQProtocolSession protocolSession) throws AMQException
{
if (_currentMessage == null)
{
@@ -254,12 +246,15 @@
{
_log.trace(debugIdentity() + "Content body received on channel " + _channelId);
}
+
try
{
// returns true iff the message was delivered (i.e. if all data was
// received
- if (_currentMessage.addContentBodyFrame(_storeContext, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToContentChunk(contentBody)))
+ if (_currentMessage.addContentBodyFrame(_storeContext,
+ protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToContentChunk(
+ contentBody)))
{
// callback to allow the context to do any post message processing
// primary use is to allow message return processing in the non-tx case
@@ -276,9 +271,7 @@
}
}
- protected void routeCurrentMessage()
- throws
- AMQException
+ protected void routeCurrentMessage() throws AMQException
{
try
{
@@ -316,15 +309,13 @@
* @throws AMQException if something goes wrong
*/
public AMQShortString subscribeToQueue(AMQShortString tag, AMQQueue queue, AMQProtocolSession session, boolean acks,
- FieldTable filters, boolean noLocal, boolean exclusive)
- throws
- AMQException,
- ConsumerTagNotUniqueException
+ FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException, ConsumerTagNotUniqueException
{
if (tag == null)
{
tag = new AMQShortString("sgen_" + getNextConsumerTag());
}
+
if (_consumerTag2QueueMap.containsKey(tag))
{
throw new ConsumerTagNotUniqueException();
@@ -332,13 +323,11 @@
queue.registerProtocolSession(session, _channelId, tag, acks, filters, noLocal, exclusive);
_consumerTag2QueueMap.put(tag, queue);
+
return tag;
}
-
- public void unsubscribeConsumer(AMQProtocolSession session, final AMQShortString consumerTag)
- throws
- AMQException
+ public void unsubscribeConsumer(AMQProtocolSession session, final AMQShortString consumerTag) throws AMQException
{
final AMQQueue q = _consumerTag2QueueMap.remove(consumerTag);
if (q != null)
@@ -353,38 +342,44 @@
* @param session The session to close
* @throws AMQException if there is an error during closure
*/
- public void close(AMQProtocolSession session)
- throws
- AMQException
+ public void close(AMQProtocolSession session) throws AMQException
{
- _closing = true;
_txnContext.rollback();
unsubscribeAllConsumers(session);
requeue();
+
+ setClosing(true);
+ }
+
+ private void setClosing(boolean closing)
+ {
+ _closing = closing;
}
- private void unsubscribeAllConsumers(AMQProtocolSession session)
- throws
- AMQException
+ private void unsubscribeAllConsumers(AMQProtocolSession session) throws AMQException
{
if (_log.isInfoEnabled())
{
if (!_consumerTag2QueueMap.isEmpty())
{
_log.info("Unsubscribing all consumers on channel " + toString());
- } else
+ }
+ else
{
_log.info("No consumers to unsubscribe on channel " + toString());
}
}
+
for (Map.Entry<AMQShortString, AMQQueue> me : _consumerTag2QueueMap.entrySet())
{
if (_log.isInfoEnabled())
{
_log.info("Unsubscribing consumer '" + me.getKey() + "' on channel " + toString());
}
+
me.getValue().unregisterProtocolSession(session, _channelId, me.getKey());
}
+
_consumerTag2QueueMap.clear();
}
@@ -404,12 +399,13 @@
if (queue == null)
{
_log.debug("Adding unacked message with a null queue:" + message.debugIdentity());
- } else
+ }
+ else
{
if (_log.isDebugEnabled())
{
- _log.debug(debugIdentity() + " Adding unacked message(" + message.toString() + " DT:" + deliveryTag +
- ") with a queue(" + queue + ") for " + consumerTag);
+ _log.debug(debugIdentity() + " Adding unacked message(" + message.toString() + " DT:" + deliveryTag
+ + ") with a queue(" + queue + ") for " + consumerTag);
}
}
}
@@ -434,9 +430,7 @@
*
* @throws org.apache.qpid.AMQException if the requeue fails
*/
- public void requeue()
- throws
- AMQException
+ public void requeue() throws AMQException
{
// we must create a new map since all the messages will get a new delivery tag when they are redelivered
Collection<UnacknowledgedMessage> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages();
@@ -454,20 +448,20 @@
if (!(_txnContext instanceof NonTransactionalContext))
{
-// if (_nonTransactedContext == null)
+ // if (_nonTransactedContext == null)
{
- _nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this,
- _returnMessages, _browsedAcks);
+ _nonTransactedContext =
+ new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks);
}
deliveryContext = _nonTransactedContext;
- } else
+ }
+ else
{
deliveryContext = _txnContext;
}
}
-
for (UnacknowledgedMessage unacked : messagesToBeDelivered)
{
if (unacked.queue != null)
@@ -483,7 +477,7 @@
// Should we allow access To the DM to directy deliver the message?
// As we don't need to check for Consumers or worry about incrementing the message count?
-// unacked.queue.getDeliveryManager().deliver(_storeContext, unacked.queue.getName(), unacked.message, false);
+ // unacked.queue.getDeliveryManager().deliver(_storeContext, unacked.queue.getName(), unacked.message, false);
}
}
@@ -495,9 +489,7 @@
* @param deliveryTag The message to requeue
* @throws AMQException If something goes wrong.
*/
- public void requeue(long deliveryTag)
- throws
- AMQException
+ public void requeue(long deliveryTag) throws AMQException
{
UnacknowledgedMessage unacked = _unacknowledgedMessageMap.remove(deliveryTag);
@@ -518,74 +510,71 @@
TransactionalContext deliveryContext;
if (!(_txnContext instanceof NonTransactionalContext))
{
-// if (_nonTransactedContext == null)
+ // if (_nonTransactedContext == null)
{
- _nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this,
- _returnMessages, _browsedAcks);
+ _nonTransactedContext =
+ new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks);
}
deliveryContext = _nonTransactedContext;
- } else
+ }
+ else
{
deliveryContext = _txnContext;
}
if (unacked.queue != null)
{
- //Redeliver the messages to the front of the queue
+ // Redeliver the messages to the front of the queue
deliveryContext.deliver(unacked.message, unacked.queue, true);
- //Deliver increments the message count but we have already deliverted this once so don't increment it again
+ // Deliver increments the message count but we have already deliverted this once so don't increment it again
// this was because deliver did an increment changed this.
- } else
+ }
+ else
{
- _log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked.message.debugIdentity() + "):" + deliveryTag +
- " but no queue defined and no DeadLetter queue so DROPPING message.");
-// _log.error("Requested requeue of message:" + deliveryTag +
-// " but no queue defined using DeadLetter queue:" + getDeadLetterQueue());
-//
-// deliveryContext.deliver(unacked.message, getDeadLetterQueue(), false);
-//
+ _log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked.message.debugIdentity()
+ + "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message.");
+ // _log.error("Requested requeue of message:" + deliveryTag +
+ // " but no queue defined using DeadLetter queue:" + getDeadLetterQueue());
+ //
+ // deliveryContext.deliver(unacked.message, getDeadLetterQueue(), false);
+ //
}
- } else
+ }
+ else
{
- _log.warn("Requested requeue of message:" + deliveryTag + " but no such delivery tag exists." + _unacknowledgedMessageMap.size());
+ _log.warn("Requested requeue of message:" + deliveryTag + " but no such delivery tag exists."
+ + _unacknowledgedMessageMap.size());
if (_log.isDebugEnabled())
{
_unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
- {
- int count = 0;
-
- public boolean callback(UnacknowledgedMessage message)
- throws
- AMQException
{
- _log.debug((count++) + ": (" + message.message.debugIdentity() + ")" +
- "[" + message.deliveryTag + "]");
- return false; // Continue
- }
+ int count = 0;
- public void visitComplete()
- {
+ public boolean callback(UnacknowledgedMessage message) throws AMQException
+ {
+ _log.debug(
+ (count++) + ": (" + message.message.debugIdentity() + ")" + "[" + message.deliveryTag + "]");
- }
- });
+ return false; // Continue
+ }
+
+ public void visitComplete()
+ { }
+ });
}
}
-
}
-
/**
* Called to resend all outstanding unacknowledged messages to this same channel.
*
* @param requeue Are the messages to be requeued or dropped.
* @throws AMQException When something goes wrong.
*/
- public void resend(final boolean requeue)
- throws
- AMQException
+ public void resend(final boolean requeue) throws AMQException
{
final List<UnacknowledgedMessage> msgToRequeue = new LinkedList<UnacknowledgedMessage>();
final List<UnacknowledgedMessage> msgToResend = new LinkedList<UnacknowledgedMessage>();
@@ -599,52 +588,53 @@
// Marking messages who still have a consumer for to be resent
// and those that don't to be requeued.
_unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
- {
- public boolean callback(UnacknowledgedMessage message)
- throws
- AMQException
- {
- AMQShortString consumerTag = message.consumerTag;
- AMQMessage msg = message.message;
- msg.setRedelivered(true);
- if (consumerTag != null)
+ {
+ public boolean callback(UnacknowledgedMessage message) throws AMQException
{
- // Consumer exists
- if (_consumerTag2QueueMap.containsKey(consumerTag))
+ AMQShortString consumerTag = message.consumerTag;
+ AMQMessage msg = message.message;
+ msg.setRedelivered(true);
+ if (consumerTag != null)
{
- msgToResend.add(message);
- } else // consumer has gone
- {
- msgToRequeue.add(message);
+ // Consumer exists
+ if (_consumerTag2QueueMap.containsKey(consumerTag))
+ {
+ msgToResend.add(message);
+ }
+ else // consumer has gone
+ {
+ msgToRequeue.add(message);
+ }
}
- } else
- {
- // Message has no consumer tag, so was "delivered" to a GET
- // or consumer no longer registered
- // cannot resend, so re-queue.
- if (message.queue != null)
+ else
{
- if (requeue)
+ // Message has no consumer tag, so was "delivered" to a GET
+ // or consumer no longer registered
+ // cannot resend, so re-queue.
+ if (message.queue != null)
{
- msgToRequeue.add(message);
- } else
+ if (requeue)
+ {
+ msgToRequeue.add(message);
+ }
+ else
+ {
+ _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + message);
+ }
+ }
+ else
{
- _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + message);
+ _log.info("Message.queue is null and no DeadLetter Queue so dropping message:" + message);
}
- } else
- {
- _log.info("Message.queue is null and no DeadLetter Queue so dropping message:" + message);
}
- }
- // false means continue processing
- return false;
- }
+ // false means continue processing
+ return false;
+ }
- public void visitComplete()
- {
- }
- });
+ public void visitComplete()
+ { }
+ });
// Process Messages to Resend
if (_log.isInfoEnabled())
@@ -652,11 +642,13 @@
if (!msgToResend.isEmpty())
{
_log.info("Preparing (" + msgToResend.size() + ") message to resend.");
- } else
+ }
+ else
{
_log.info("No message to resend.");
}
}
+
for (UnacknowledgedMessage message : msgToResend)
{
AMQMessage msg = message.message;
@@ -665,22 +657,21 @@
// If the client has requested the messages be resent then it is
// their responsibility to ensure that thay are capable of receiving them
// i.e. The channel hasn't been server side suspended.
-// if (isSuspended())
-// {
-// _log.info("Channel is suspended so requeuing");
-// //move this message to requeue
-// msgToRequeue.add(message);
-// }
-// else
-// {
- //release to allow it to be delivered
+ // if (isSuspended())
+ // {
+ // _log.info("Channel is suspended so requeuing");
+ // //move this message to requeue
+ // msgToRequeue.add(message);
+ // }
+ // else
+ // {
+ // release to allow it to be delivered
msg.release(message.queue);
// Without any details from the client about what has been processed we have to mark
// all messages in the unacked map as redelivered.
msg.setRedelivered(true);
-
Subscription sub = msg.getDeliveredSubscription(message.queue);
if (sub != null)
@@ -697,32 +688,38 @@
{
if (_log.isDebugEnabled())
{
- _log.debug("Subscription(" + System.identityHashCode(sub) + ") closed during resend so requeuing message");
+ _log.debug("Subscription(" + System.identityHashCode(sub)
+ + ") closed during resend so requeuing message");
}
- //move this message to requeue
+ // move this message to requeue
msgToRequeue.add(message);
- } else
+ }
+ else
{
if (_log.isDebugEnabled())
{
- _log.debug("Requeuing " + msg.debugIdentity() + " for resend via sub:" + System.identityHashCode(sub));
+ _log.debug("Requeuing " + msg.debugIdentity() + " for resend via sub:"
+ + System.identityHashCode(sub));
}
+
sub.addToResendQueue(msg);
_unacknowledgedMessageMap.remove(message.deliveryTag);
}
} // sync(sub.getSendLock)
- } else
+ }
+ else
{
if (_log.isInfoEnabled())
{
- _log.info("DeliveredSubscription not recorded so just requeueing(" + message.toString() + ")to prevent loss");
+ _log.info("DeliveredSubscription not recorded so just requeueing(" + message.toString()
+ + ")to prevent loss");
}
- //move this message to requeue
+ // move this message to requeue
msgToRequeue.add(message);
}
} // for all messages
-// } else !isSuspend
+ // } else !isSuspend
if (_log.isInfoEnabled())
{
@@ -739,12 +736,13 @@
{
if (_nonTransactedContext == null)
{
- _nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this,
- _returnMessages, _browsedAcks);
+ _nonTransactedContext =
+ new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks);
}
deliveryContext = _nonTransactedContext;
- } else
+ }
+ else
{
deliveryContext = _txnContext;
}
@@ -769,36 +767,32 @@
* @param queue the queue that has been deleted
* @throws org.apache.qpid.AMQException if there is an error processing the unacked messages
*/
- public void queueDeleted(final AMQQueue queue)
- throws
- AMQException
+ public void queueDeleted(final AMQQueue queue) throws AMQException
{
_unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
- {
- public boolean callback(UnacknowledgedMessage message)
- throws
- AMQException
{
- if (message.queue == queue)
+ public boolean callback(UnacknowledgedMessage message) throws AMQException
{
- try
- {
- message.discard(_storeContext);
- message.queue = null;
- }
- catch (AMQException e)
+ if (message.queue == queue)
{
- _log.error("Error decrementing ref count on message " + message.message.getMessageId() + ": " +
- e, e);
+ try
+ {
+ message.discard(_storeContext);
+ message.queue = null;
+ }
+ catch (AMQException e)
+ {
+ _log.error(
+ "Error decrementing ref count on message " + message.message.getMessageId() + ": " + e, e);
+ }
}
+
+ return false;
}
- return false;
- }
- public void visitComplete()
- {
- }
- });
+ public void visitComplete()
+ { }
+ });
}
/**
@@ -809,9 +803,7 @@
* acknowledges the single message specified by the delivery tag
* @throws AMQException if the delivery tag is unknown (e.g. not outstanding) on this channel
*/
- public void acknowledgeMessage(long deliveryTag, boolean multiple)
- throws
- AMQException
+ public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException
{
synchronized (_unacknowledgedMessageMap.getLock())
{
@@ -828,6 +820,7 @@
}
}
+
checkSuspension();
}
@@ -845,8 +838,9 @@
{
boolean suspend;
- suspend = ((_prefetch_HighWaterMark != 0) && _unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark)
- || ((_prefetchSize != 0) && _prefetchSize < _unacknowledgedMessageMap.getUnacknowledgeBytes());
+ suspend =
+ ((_prefetch_HighWaterMark != 0) && (_unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark))
+ || ((_prefetchSize != 0) && (_prefetchSize < _unacknowledgedMessageMap.getUnacknowledgeBytes()));
setSuspended(suspend);
}
@@ -867,12 +861,13 @@
if (wasSuspended)
{
_log.debug("Unsuspending channel " + this);
- //may need to deliver queued messages
+ // may need to deliver queued messages
for (AMQQueue q : _consumerTag2QueueMap.values())
{
q.deliverAsync();
}
- } else
+ }
+ else
{
_log.debug("Suspending channel " + this);
}
@@ -884,20 +879,17 @@
return _suspended.get();
}
- public void commit()
- throws
- AMQException
+ public void commit() throws AMQException
{
if (!isTransactional())
{
throw new AMQException("Fatal error: commit called on non-transactional channel");
}
- _txnContext.commit();
+
+ _txnContext.commit();
}
- public void rollback()
- throws
- AMQException
+ public void rollback() throws AMQException
{
_txnContext.rollback();
}
@@ -908,6 +900,7 @@
sb.append("Channel: id ").append(_channelId).append(", transaction mode: ").append(isTransactional());
sb.append(", prefetch marks: ").append(_prefetch_LowWaterMark);
sb.append("/").append(_prefetch_HighWaterMark);
+
return sb.toString();
}
@@ -926,41 +919,40 @@
return _storeContext;
}
- public void processReturns(AMQProtocolSession session)
- throws
- AMQException
+ public void processReturns(AMQProtocolSession session) throws AMQException
{
for (RequiredDeliveryException bouncedMessage : _returnMessages)
{
AMQMessage message = bouncedMessage.getAMQMessage();
- session.getProtocolOutputConverter().writeReturn(message, _channelId,
- bouncedMessage.getReplyCode().getCode(),
- new AMQShortString(bouncedMessage.getMessage()));
+ session.getProtocolOutputConverter().writeReturn(message, _channelId, bouncedMessage.getReplyCode().getCode(),
+ new AMQShortString(bouncedMessage.getMessage()));
}
+
_returnMessages.clear();
}
-
public boolean wouldSuspend(AMQMessage msg)
{
if (isSuspended())
{
return true;
- } else
+ }
+ else
{
- boolean willSuspend = ((_prefetch_HighWaterMark != 0) && _unacknowledgedMessageMap.size() + 1 > _prefetch_HighWaterMark);
+ boolean willSuspend =
+ ((_prefetch_HighWaterMark != 0) && ((_unacknowledgedMessageMap.size() + 1) > _prefetch_HighWaterMark));
if (!willSuspend)
{
final long unackedSize = _unacknowledgedMessageMap.getUnacknowledgeBytes();
- willSuspend = (_prefetchSize != 0) && (unackedSize != 0) && (_prefetchSize < msg.getSize() + unackedSize);
+ willSuspend = (_prefetchSize != 0) && (unackedSize != 0) && (_prefetchSize < (msg.getSize() + unackedSize));
}
-
if (willSuspend)
{
setSuspended(true);
}
+
return willSuspend;
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java?view=diff&rev=538968&r1=538967&r2=538968
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java Thu May 17 08:32:18 2007
@@ -23,6 +23,8 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.StringTokenizer;
+import java.util.LinkedList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -56,6 +58,10 @@
private static final Logger _logger = Logger.getLogger(DestWildExchange.class);
private ConcurrentHashMap<AMQShortString, List<AMQQueue>> _routingKey2queues = new ConcurrentHashMap<AMQShortString, List<AMQQueue>>();
+ // private ConcurrentHashMap<AMQShortString, AMQQueue> _routingKey2queue = new ConcurrentHashMap<AMQShortString, AMQQueue>();
+ private static final String TOPIC_SEPARATOR = ".";
+ private static final String AMQP_STAR = "*";
+ private static final String AMQP_HASH = "#";
/** DestWildExchangeMBean class implements the management interface for the Topic exchanges. */
@MBeanDescription("Management Bean for Topic Exchange")
@@ -78,7 +84,7 @@
AMQShortString key = entry.getKey();
List<String> queueList = new ArrayList<String>();
- List<AMQQueue> queues = entry.getValue();
+ List<AMQQueue> queues = getMatchedQueues(key);
for (AMQQueue q : queues)
{
queueList.add(q.getName().toString());
@@ -118,10 +124,13 @@
return ExchangeDefaults.TOPIC_EXCHANGE_CLASS;
}
- public synchronized void registerQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
+ public synchronized void registerQueue(AMQShortString rKey, AMQQueue queue, FieldTable args) throws AMQException
{
assert queue != null;
- assert routingKey != null;
+ assert rKey != null;
+
+ AMQShortString routingKey = normalize(rKey);
+
_logger.debug("Registering queue " + queue.getName() + " with routing key " + routingKey);
// we need to use putIfAbsent, which is an atomic operation, to avoid a race condition
List<AMQQueue> queueList = _routingKey2queues.putIfAbsent(routingKey, new CopyOnWriteArrayList<AMQQueue>());
@@ -142,15 +151,67 @@
}
+ private AMQShortString normalize(AMQShortString routingKey)
+ {
+ StringTokenizer routingTokens = new StringTokenizer(routingKey.toString(), TOPIC_SEPARATOR);
+ List<String> _subscription = new ArrayList<String>();
+
+ while (routingTokens.hasMoreTokens())
+ {
+ _subscription.add(routingTokens.nextToken());
+ }
+
+ int size = _subscription.size();
+
+ for (int index = 0; index < size; index++)
+ {
+ //if there are more levels
+ if (index + 1 < size)
+ {
+ if (_subscription.get(index).equals(AMQP_HASH))
+ {
+ if (_subscription.get(index + 1).equals(AMQP_HASH))
+ {
+ // we don't need #.# delete this one
+ _subscription.remove(index);
+ size--;
+ //redo this normalisation
+ index--;
+ }
+
+ if (_subscription.get(index + 1).equals(AMQP_STAR))
+ {
+ // we don't want #.* swap to *.#
+ // remove it and put it in at index + 1
+ _subscription.add(index + 1, _subscription.remove(index));
+ }
+ }
+ }//if we have more levels
+ }
+
+ StringBuilder sb = new StringBuilder();
+
+ for (String s : _subscription)
+ {
+ sb.append(s);
+ sb.append(TOPIC_SEPARATOR);
+ }
+
+ sb.deleteCharAt(sb.length() - 1);
+
+ return new AMQShortString(sb.toString());
+ }
+
public void route(AMQMessage payload) throws AMQException
{
MessagePublishInfo info = payload.getMessagePublishInfo();
- final AMQShortString routingKey = info.getRoutingKey();
- List<AMQQueue> queues = _routingKey2queues.get(routingKey);
+ final AMQShortString routingKey = normalize(info.getRoutingKey());
+
+ List<AMQQueue> queues = getMatchedQueues(routingKey);
// if we have no registered queues we have nothing to do
// TODO: add support for the immediate flag
- if (queues == null)
+ if (queues == null || queues.size() == 0)
{
if (info.isMandatory())
{
@@ -177,14 +238,14 @@
public boolean isBound(AMQShortString routingKey, AMQQueue queue) throws AMQException
{
- List<AMQQueue> queues = _routingKey2queues.get(routingKey);
+ List<AMQQueue> queues = _routingKey2queues.get(normalize(routingKey));
return queues != null && queues.contains(queue);
}
public boolean isBound(AMQShortString routingKey) throws AMQException
{
- List<AMQQueue> queues = _routingKey2queues.get(routingKey);
+ List<AMQQueue> queues = _routingKey2queues.get(normalize(routingKey));
return queues != null && !queues.isEmpty();
}
@@ -205,10 +266,12 @@
return !_routingKey2queues.isEmpty();
}
- public synchronized void deregisterQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
+ public synchronized void deregisterQueue(AMQShortString rKey, AMQQueue queue, FieldTable args) throws AMQException
{
assert queue != null;
- assert routingKey != null;
+ assert rKey != null;
+
+ AMQShortString routingKey = normalize(rKey);
List<AMQQueue> queues = _routingKey2queues.get(routingKey);
if (queues == null)
@@ -240,5 +303,111 @@
_logger.error("Exception occured in creating the topic exchenge mbean", ex);
throw new AMQException("Exception occured in creating the topic exchenge mbean", ex);
}
+ }
+
+
+ private List<AMQQueue> getMatchedQueues(AMQShortString routingKey)
+ {
+ List<AMQQueue> list = new LinkedList<AMQQueue>();
+ StringTokenizer routingTokens = new StringTokenizer(routingKey.toString(), TOPIC_SEPARATOR);
+
+ ArrayList<String> routingkeyList = new ArrayList<String>();
+
+ while (routingTokens.hasMoreTokens())
+ {
+ String next = routingTokens.nextToken();
+ if (next.equals(AMQP_HASH) && routingkeyList.get(routingkeyList.size() - 1).equals(AMQP_HASH))
+ {
+ continue;
+ }
+
+ routingkeyList.add(next);
+ }
+
+ for (AMQShortString queue : _routingKey2queues.keySet())
+ {
+ StringTokenizer queTok = new StringTokenizer(queue.toString(), TOPIC_SEPARATOR);
+
+ ArrayList<String> queueList = new ArrayList<String>();
+
+ while (queTok.hasMoreTokens())
+ {
+ queueList.add(queTok.nextToken());
+ }
+
+
+ int depth = 0;
+ boolean matching = true;
+ boolean done = false;
+ int routingskip = 0;
+ int queueskip = 0;
+
+ while (matching && !done)
+ {
+ if (queueList.size() == depth + queueskip || routingkeyList.size() == depth + routingskip)
+ {
+ done = true;
+
+ // if it was the routing key that ran out of digits
+ if (routingkeyList.size() == depth + routingskip)
+ {
+ if (queueList.size() > (depth + queueskip))
+ { // a hash and it is the last entry
+ matching = queueList.get(depth + queueskip).equals(AMQP_HASH) && queueList.size() == depth + queueskip + 1;
+ }
+ }
+ else if (routingkeyList.size() > depth + routingskip)
+ {
+ // There is still more routing key to check
+ matching = false;
+ }
+
+
+ continue;
+ }
+
+ // if the values on the two topics don't match
+ if (!queueList.get(depth + queueskip).equals(routingkeyList.get(depth + routingskip)))
+ {
+ if (queueList.get(depth + queueskip).equals(AMQP_STAR))
+ {
+ depth++;
+
+ continue;
+ }
+ else if (queueList.get(depth + queueskip).equals(AMQP_HASH))
+ {
+ // Is this a # at the end
+ if (queueList.size() == depth + queueskip + 1)
+ {
+ done = true;
+ continue;
+ }
+
+ // otherwise # in the middle
+ while (routingkeyList.size() > depth + routingskip)
+ {
+ if (routingkeyList.get(depth + routingskip).equals(queueList.get(depth + queueskip + 1)))
+ {
+ queueskip++;
+ depth++;
+ break;
+ }
+ routingskip++;
+ }
+ continue;
+ }
+ matching = false;
+ }
+ depth++;
+ }
+
+ if (matching)
+ {
+ list.addAll(_routingKey2queues.get(queue));
+ }
+ }
+
+ return list;
}
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=diff&rev=538968&r1=538967&r2=538968
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Thu May 17 08:32:18 2007
@@ -451,13 +451,7 @@
AMQMessage message = messages.peek();
//while (we have a message) && ((The subscriber is not a browser or message is taken ) or we are clearing) && (Check message is taken.)
- while (message != null
- && (
- ((sub != null && !sub.isBrowser()) || message.isTaken(_queue))
- || sub == null)
- && (message.taken(_queue, sub) // Message not taken by another consumer ... unless it is expired
- || (sub == null || message.expired(sub.getChannel().getStoreContext(), _queue))) // Message not expired
- )
+ while (purgeMessage(message, sub))
{
//remove the already taken message or expired
AMQMessage removed = messages.poll();
@@ -476,6 +470,54 @@
}
return message;
+ }
+
+ /**
+ *
+ * @param message
+ * @param sub
+ * @return
+ * @throws AMQException
+ */
+ private boolean purgeMessage(AMQMessage message, Subscription sub) throws AMQException
+ {
+ //Original.. complicated while loop control
+// (message != null
+// && (
+// ((sub != null && !sub.isBrowser()) || message.isTaken(_queue))
+// || sub == null)
+// && message.taken(_queue, sub));
+
+ boolean purge = false;
+
+ // if the message is null then don't purge as we have no messagse.
+ if (message != null)
+ {
+ // if we have a subscriber perform message checks
+ if (sub != null)
+ {
+ // Check that the message hasn't expired.
+ if (message.expired(sub.getChannel().getStoreContext(), _queue))
+ {
+ return true;
+ }
+
+ // if we have a queue browser(we don't purge) so check mark the message as taken
+ purge = ((!sub.isBrowser() || message.isTaken(_queue)));
+ }
+ else
+ {
+ // if there is no subscription we are doing
+ // a get or purging so mark message as taken.
+ message.isTaken(_queue);
+ // and then ensure that it gets purged
+ purge = true;
+ }
+ }
+
+ // if we are purging then ensure we mark this message taken for the current subscriber
+ // the current subscriber may be null in the case of a get or a purge but this is ok.
+ return purge && message.taken(_queue, sub);
}
public void sendNextMessage(Subscription sub, AMQQueue queue)//Queue<AMQMessage> messageQueue)
Copied: incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java (from r538882, incubator/qpid/branches/M2/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java?view=diff&rev=538968&p1=incubator/qpid/branches/M2/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java&r1=538882&p2=incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java&r2=538968
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java Thu May 17 08:32:18 2007
@@ -14,33 +14,35 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
+ * under the License.
+ *
*
- *
*/
package org.apache.qpid.server.exchange;
-import junit.framework.TestCase;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+
import junit.framework.Assert;
-import org.apache.qpid.server.queue.AMQQueue;
+import junit.framework.TestCase;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.messageStore.MemoryMessageStore;
+import org.apache.qpid.server.messageStore.MessageStore;
import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MessageHandleFactory;
-import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.registry.ApplicationRegistry;
+// import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MemoryMessageStore;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.RequiredDeliveryException;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-
-import java.util.HashSet;
-import java.util.List;
-import java.util.LinkedList;
+import org.apache.qpid.server.virtualhost.VirtualHost;
public class DestWildExchangeTest extends TestCase
{
@@ -51,22 +53,20 @@
MessageStore _store;
StoreContext _context;
-
public void setUp() throws AMQException
{
_exchange = new DestWildExchange();
_vhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHosts().iterator().next();
+ // _store = new MemoryMessageStore();
_store = new MemoryMessageStore();
_context = new StoreContext();
}
-
public void testNoRoute() throws AMQException
{
AMQQueue queue = new AMQQueue(new AMQShortString("a*#b"), false, null, false, _vhost);
_exchange.registerQueue(new AMQShortString("a.*.#.b"), queue, null);
-
MessagePublishInfo info = new PublishInfo(new AMQShortString("a.b"));
AMQMessage message = new AMQMessage(0L, info, null);
@@ -78,7 +78,7 @@
}
catch (NoRouteException nre)
{
- //normal
+ // normal
}
Assert.assertEquals(0, queue.getMessageCount());
@@ -89,7 +89,6 @@
AMQQueue queue = new AMQQueue(new AMQShortString("ab"), false, null, false, _vhost);
_exchange.registerQueue(new AMQShortString("a.b"), queue, null);
-
AMQMessage message = createMessage("a.b");
try
@@ -109,7 +108,6 @@
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
-
message = createMessage("a.c");
try
@@ -119,19 +117,16 @@
fail("Message has no route and should fail to be routed");
}
catch (AMQException nre)
- {
- }
+ { }
Assert.assertEquals(0, queue.getMessageCount());
}
-
public void testStarMatch() throws AMQException
{
AMQQueue queue = new AMQQueue(new AMQShortString("a*"), false, null, false, _vhost);
_exchange.registerQueue(new AMQShortString("a.*"), queue, null);
-
AMQMessage message = createMessage("a.b");
try
@@ -151,7 +146,6 @@
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
-
message = createMessage("a.c");
try
@@ -171,7 +165,6 @@
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
-
message = createMessage("a");
try
@@ -181,8 +174,7 @@
fail("Message has no route and should fail to be routed");
}
catch (AMQException nre)
- {
- }
+ { }
Assert.assertEquals(0, queue.getMessageCount());
}
@@ -192,7 +184,6 @@
AMQQueue queue = new AMQQueue(new AMQShortString("a#"), false, null, false, _vhost);
_exchange.registerQueue(new AMQShortString("a.#"), queue, null);
-
AMQMessage message = createMessage("a.b.c");
try
@@ -212,7 +203,6 @@
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
-
message = createMessage("a.b");
try
@@ -232,7 +222,6 @@
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
-
message = createMessage("a.c");
try
@@ -271,7 +260,6 @@
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
-
message = createMessage("b");
try
@@ -281,19 +269,16 @@
fail("Message has no route and should fail to be routed");
}
catch (AMQException nre)
- {
- }
+ { }
Assert.assertEquals(0, queue.getMessageCount());
}
-
public void testMidHash() throws AMQException
{
AMQQueue queue = new AMQQueue(new AMQShortString("a"), false, null, false, _vhost);
_exchange.registerQueue(new AMQShortString("a.*.#.b"), queue, null);
-
AMQMessage message = createMessage("a.c.d.b");
try
@@ -339,7 +324,6 @@
AMQQueue queue = new AMQQueue(new AMQShortString("a#"), false, null, false, _vhost);
_exchange.registerQueue(new AMQShortString("a.*.#.b.c"), queue, null);
-
AMQMessage message = createMessage("a.c.b.b");
try
@@ -349,12 +333,10 @@
fail("Message has route and should not be routed");
}
catch (AMQException nre)
- {
- }
+ { }
Assert.assertEquals(0, queue.getMessageCount());
-
message = createMessage("a.a.b.c");
try
@@ -383,8 +365,7 @@
fail("Message has route and should not be routed");
}
catch (AMQException nre)
- {
- }
+ { }
Assert.assertEquals(0, queue.getMessageCount());
@@ -410,13 +391,11 @@
}
-
public void testHashAfterHash() throws AMQException
{
AMQQueue queue = new AMQQueue(new AMQShortString("a#"), false, null, false, _vhost);
_exchange.registerQueue(new AMQShortString("a.*.#.b.c.#.d"), queue, null);
-
AMQMessage message = createMessage("a.c.b.b.c");
try
@@ -426,12 +405,10 @@
fail("Message has route and should not be routed");
}
catch (AMQException nre)
- {
- }
+ { }
Assert.assertEquals(0, queue.getMessageCount());
-
message = createMessage("a.a.b.c.d");
try
@@ -458,7 +435,6 @@
AMQQueue queue = new AMQQueue(new AMQShortString("a#"), false, null, false, _vhost);
_exchange.registerQueue(new AMQShortString("a.#.*.#.d"), queue, null);
-
AMQMessage message = createMessage("a.c.b.b.c");
try
@@ -468,8 +444,7 @@
fail("Message has route and should not be routed");
}
catch (AMQException nre)
- {
- }
+ { }
Assert.assertEquals(0, queue.getMessageCount());
@@ -499,7 +474,6 @@
AMQQueue queue = new AMQQueue(new AMQShortString("a"), false, null, false, _vhost);
_exchange.registerQueue(new AMQShortString("a.b.c.d"), queue, null);
-
AMQMessage message = createMessage("a.b.c");
try
@@ -509,8 +483,7 @@
fail("Message has route and should not be routed");
}
catch (AMQException nre)
- {
- }
+ { }
Assert.assertEquals(0, queue.getMessageCount());
@@ -521,7 +494,6 @@
AMQQueue queue = new AMQQueue(new AMQShortString("a"), false, null, false, _vhost);
_exchange.registerQueue(new AMQShortString("a.b"), queue, null);
-
AMQMessage message = createMessage("a.b.c");
try
@@ -531,8 +503,7 @@
fail("Message has route and should not be routed");
}
catch (AMQException nre)
- {
- }
+ { }
Assert.assertEquals(0, queue.getMessageCount());
@@ -543,7 +514,6 @@
AMQQueue queue = new AMQQueue(new AMQShortString("a"), false, null, false, _vhost);
_exchange.registerQueue(new AMQShortString("a.b"), queue, null);
-
AMQMessage message = createMessage("a");
try
@@ -553,8 +523,7 @@
fail("Message has route and should not be routed");
}
catch (AMQException nre)
- {
- }
+ { }
Assert.assertEquals(0, queue.getMessageCount());
@@ -564,16 +533,15 @@
{
MessagePublishInfo info = new PublishInfo(new AMQShortString(s));
- TransactionalContext trancontext = new NonTransactionalContext(_store, _context, null,
- new LinkedList<RequiredDeliveryException>(),
- new HashSet<Long>());
+ TransactionalContext trancontext =
+ new NonTransactionalContext(_store, _context, null, new LinkedList<RequiredDeliveryException>(),
+ new HashSet<Long>());
AMQMessage message = new AMQMessage(0L, info, trancontext);
message.setContentHeaderBody(new ContentHeaderBody());
return message;
}
-
class PublishInfo implements MessagePublishInfo
{
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java?view=diff&rev=538968&r1=538967&r2=538968
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java Thu May 17 08:32:18 2007
@@ -467,11 +467,6 @@
public void setIntProperty(String propertyName, int i) throws JMSException
{
- if (_strictAMQP)
- {
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
- }
-
checkWritableProperties();
JMSHeaderAdapter.checkPropertyName(propertyName);
super.setIntProperty(new AMQShortString(propertyName), new Integer(i));
Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java?view=diff&rev=538968&r1=538967&r2=538968
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java Thu May 17 08:32:18 2007
@@ -250,7 +250,7 @@
;
}
- assertTrue("Wrong number of messages bounced (expect 1): " + _bouncedMessageList.size(), _bouncedMessageList.size() == 1);
+ assertEquals("Wrong number of messages bounced: ", 1, _bouncedMessageList.size());
Message m = _bouncedMessageList.get(0);
assertTrue("Wrong message bounced: " + m.toString(), m.toString().contains("msg2"));