You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/01/19 11:35:25 UTC
svn commit: r497770 - in /incubator/qpid/trunk/qpid/java:
broker/src/main/java/org/apache/qpid/server/
broker/src/main/java/org/apache/qpid/server/ack/
broker/src/main/java/org/apache/qpid/server/exchange/
broker/src/main/java/org/apache/qpid/server/ha...
Author: rgreig
Date: Fri Jan 19 02:35:21 2007
New Revision: 497770
URL: http://svn.apache.org/viewvc?view=rev&rev=497770
Log:
QPID-275 : Patch supplied by Rob Godfrey - Add support for get / purge / qos size / default exchanges and some other small fixes highlighted by the python tests
Removed:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Fri Jan 19 02:35:21 2007
@@ -37,6 +37,7 @@
import org.apache.qpid.server.txn.LocalTransactionalContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.mina.common.ByteBuffer;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -56,6 +57,8 @@
private long _prefetch_LowWaterMark;
+ private long _prefetchSize;
+
/**
* The delivery tag is unique per channel. This is pre-incremented before putting into the deliver frame so that
* value of this represents the <b>last</b> tag sent out
@@ -108,6 +111,8 @@
private Set<Long> _browsedAcks = new HashSet<Long>();
+
+
public AMQChannel(int channelId, MessageStore messageStore, MessageRouter exchanges)
throws AMQException
{
@@ -151,6 +156,17 @@
_prefetch_HighWaterMark = prefetchCount;
}
+ public long getPrefetchSize()
+ {
+ return _prefetchSize;
+ }
+
+
+ public void setPrefetchSize(long prefetchSize)
+ {
+ _prefetchSize = prefetchSize;
+ }
+
public long getPrefetchLowMarkCount()
{
return _prefetch_LowWaterMark;
@@ -213,14 +229,15 @@
throw new AMQException("Received content body without previously receiving a JmsPublishBody");
}
- // returns true iff the message was delivered (i.e. if all data was
- // received
if (_log.isDebugEnabled())
{
_log.debug("Content body received on channel " + _channelId);
}
try
{
+
+ // returns true iff the message was delivered (i.e. if all data was
+ // received
if (_currentMessage.addContentBodyFrame(_storeContext, contentBody))
{
// callback to allow the context to do any post message processing
@@ -269,13 +286,14 @@
* @param queue the queue to subscribe to
* @param session the protocol session of the subscriber
* @param noLocal
+ * @param exclusive
* @return the consumer tag. This is returned to the subscriber and used in
* subsequent unsubscribe requests
* @throws ConsumerTagNotUniqueException if the tag is not unique
* @throws AMQException if something goes wrong
*/
public AMQShortString subscribeToQueue(AMQShortString tag, AMQQueue queue, AMQProtocolSession session, boolean acks,
- FieldTable filters, boolean noLocal) throws AMQException, ConsumerTagNotUniqueException
+ FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException, ConsumerTagNotUniqueException
{
if (tag == null)
{
@@ -286,7 +304,7 @@
throw new ConsumerTagNotUniqueException();
}
- queue.registerProtocolSession(session, _channelId, tag, acks, filters, noLocal);
+ queue.registerProtocolSession(session, _channelId, tag, acks, filters, noLocal, exclusive);
_consumerTag2QueueMap.put(tag, queue);
return tag;
}
@@ -364,8 +382,10 @@
/**
* Called to resend all outstanding unacknowledged messages to this same channel.
*/
- public void resend(final AMQProtocolSession session) throws AMQException
+ public void resend(final AMQProtocolSession session, final boolean requeue) throws AMQException
{
+ final List<UnacknowledgedMessage> msgToRequeue = new LinkedList<UnacknowledgedMessage>();
+
_unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
{
public boolean callback(UnacknowledgedMessage message) throws AMQException
@@ -374,7 +394,20 @@
AMQShortString consumerTag = message.consumerTag;
AMQMessage msg = message.message;
msg.setRedelivered(true);
- msg.writeDeliver(session, _channelId, deliveryTag, consumerTag);
+ if((consumerTag != null) && _consumerTag2QueueMap.containsKey(consumerTag))
+ {
+ msg.writeDeliver(session, _channelId, deliveryTag, consumerTag);
+ }
+ else
+ {
+ // Message has no consumer tag, so was "delivered" to a GET
+ // or consumer no longer registered
+ // cannot resend, so re-queue.
+ if (message.queue != null && (consumerTag == null || requeue))
+ {
+ msgToRequeue.add(message);
+ }
+ }
// false means continue processing
return false;
}
@@ -383,6 +416,12 @@
{
}
});
+
+ for(UnacknowledgedMessage message : msgToRequeue)
+ {
+ _txnContext.deliver(message.message, message.queue);
+ _unacknowledgedMessageMap.remove(message.deliveryTag);
+ }
}
/**
@@ -459,8 +498,9 @@
{
boolean suspend;
- suspend = _unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark;
-
+ suspend = ((_prefetch_HighWaterMark != 0) && _unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark)
+ || ((_prefetchSize != 0) && _prefetchSize < _unacknowledgedMessageMap.getUnacknowledgeBytes());
+
setSuspended(suspend);
}
@@ -544,5 +584,32 @@
message.writeReturn(session, _channelId, bouncedMessage.getReplyCode(), new AMQShortString(bouncedMessage.getMessage()));
}
_returnMessages.clear();
+ }
+
+
+ public boolean wouldSuspend(AMQMessage msg)
+ {
+ if (isSuspended())
+ {
+ return true;
+ }
+ else
+ {
+ boolean willSuspend = ((_prefetch_HighWaterMark != 0) && _unacknowledgedMessageMap.size() + 1 > _prefetch_HighWaterMark);
+ if(!willSuspend)
+ {
+ final long unackedSize = _unacknowledgedMessageMap.getUnacknowledgeBytes();
+
+ willSuspend = (_prefetchSize != 0) && (unackedSize != 0) && (_prefetchSize < msg.getSize() + unackedSize);
+ }
+
+
+ if(willSuspend)
+ {
+ setSuspended(true);
+ }
+ return willSuspend;
+ }
+
}
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java Fri Jan 19 02:35:21 2007
@@ -73,5 +73,7 @@
* @return a set of delivery tags
*/
Set<Long> getDeliveryTags();
+
+ public long getUnacknowledgeBytes();
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java Fri Jan 19 02:35:21 2007
@@ -32,6 +32,8 @@
{
private final Object _lock = new Object();
+ private long _unackedSize;
+
private Map<Long, UnacknowledgedMessage> _map;
private long _lastDeliveryTag;
@@ -77,7 +79,8 @@
{
for (UnacknowledgedMessage msg : msgs)
{
- _map.remove(msg.deliveryTag);
+ remove(msg.deliveryTag);
+
}
}
}
@@ -86,7 +89,14 @@
{
synchronized (_lock)
{
- return _map.remove(deliveryTag);
+
+ UnacknowledgedMessage message = _map.remove(deliveryTag);
+ if(message != null)
+ {
+ _unackedSize -= message.message.getSize();
+ }
+
+ return message;
}
}
@@ -113,6 +123,7 @@
synchronized (_lock)
{
_map.put(deliveryTag, message);
+ _unackedSize += message.message.getSize();
_lastDeliveryTag = deliveryTag;
}
}
@@ -123,6 +134,7 @@
{
Collection<UnacknowledgedMessage> currentEntries = _map.values();
_map = new LinkedHashMap<Long, UnacknowledgedMessage>(_prefetchLimit);
+ _unackedSize = 0l;
return currentEntries;
}
}
@@ -149,6 +161,7 @@
synchronized (_lock)
{
_map.clear();
+ _unackedSize = 0l;
}
}
@@ -169,6 +182,7 @@
}
it.remove();
+ _unackedSize -= unacked.getValue().message.getSize();
destination.add(unacked.getValue());
if (unacked.getKey() == deliveryTag)
@@ -189,7 +203,10 @@
AMQShortString consumerTag = entry.getValue().consumerTag;
AMQMessage msg = entry.getValue().message;
- msg.writeDeliver(protocolSession, channelId, deliveryTag, consumerTag);
+ if(consumerTag != null)
+ {
+ msg.writeDeliver(protocolSession, channelId, deliveryTag, consumerTag);
+ }
}
}
}
@@ -223,5 +240,10 @@
}
}
}
+ }
+
+ public long getUnacknowledgeBytes()
+ {
+ return _unackedSize;
}
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java Fri Jan 19 02:35:21 2007
@@ -38,6 +38,8 @@
*/
private ConcurrentMap<AMQShortString, Exchange> _exchangeMap = new ConcurrentHashMap<AMQShortString, Exchange>();
+ private Exchange _defaultExchange;
+
public DefaultExchangeRegistry(ExchangeFactory exchangeFactory)
{
//create 'standard' exchanges:
@@ -53,9 +55,18 @@
public void registerExchange(Exchange exchange)
{
+ if(_defaultExchange == null)
+ {
+ setDefaultExchange(exchange);
+ }
_exchangeMap.put(exchange.getName(), exchange);
}
+ public void setDefaultExchange(Exchange exchange)
+ {
+ _defaultExchange = exchange;
+ }
+
public void unregisterExchange(AMQShortString name, boolean inUse) throws AMQException
{
// TODO: check inUse argument
@@ -72,7 +83,16 @@
public Exchange getExchange(AMQShortString name)
{
- return _exchangeMap.get(name);
+
+ if(name == null || name.length() == 0)
+ {
+ return _defaultExchange;
+ }
+ else
+ {
+ return _exchangeMap.get(name);
+ }
+
}
/**
@@ -83,7 +103,7 @@
public void routeContent(AMQMessage payload) throws AMQException
{
final AMQShortString exchange = payload.getPublishBody().exchange;
- final Exchange exch = _exchangeMap.get(exchange);
+ final Exchange exch = getExchange(exchange);
// there is a small window of opportunity for the exchange to be deleted in between
// the BasicPublish being received (where the exchange is validated) and the final
// content body being received (which triggers this method)
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java Fri Jan 19 02:35:21 2007
@@ -38,4 +38,6 @@
void unregisterExchange(AMQShortString name, boolean inUse) throws ExchangeInUseException, AMQException;
Exchange getExchange(AMQShortString name);
+
+ void setDefaultExchange(Exchange exchange);
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java Fri Jan 19 02:35:21 2007
@@ -22,6 +22,8 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInvalidSelectorException;
+import org.apache.qpid.AMQChannelException;
+import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.framing.*;
import org.apache.qpid.server.AMQChannel;
@@ -66,6 +68,7 @@
}
else
{
+
AMQQueue queue = body.queue == null ? channel.getDefaultQueue() : queueRegistry.getQueue(body.queue);
if (queue == null)
@@ -73,29 +76,13 @@
_log.info("No queue for '" + body.queue + "'");
if(body.queue!=null)
{
- AMQShortString msg = new AMQShortString("No such queue, '" + body.queue + "'");
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- session.writeFrame(ChannelCloseBody.createAMQFrame(channelId,
- (byte)8, (byte)0, // AMQP version (major, minor)
- BasicConsumeBody.getClazz((byte)8, (byte)0), // classId
- BasicConsumeBody.getMethod((byte)8, (byte)0), // methodId
- AMQConstant.NOT_FOUND.getCode(), // replyCode
- msg)); // replyText
+ String msg = "No such queue, '" + body.queue + "'";
+ throw body.getChannelException(AMQConstant.NOT_FOUND.getCode(), msg);
}
else
{
- AMQShortString msg = new AMQShortString("No queue name provided, no default queue defined.");
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId,
- (byte)8, (byte)0, // AMQP version (major, minor)
- BasicConsumeBody.getClazz((byte)8, (byte)0), // classId
- BasicConsumeBody.getMethod((byte)8, (byte)0), // methodId
- AMQConstant.NOT_ALLOWED.getCode(), // replyCode
- msg)); // replyText
+ String msg = "No queue name provided, no default queue defined.";
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED.getCode(),msg );
}
}
else
@@ -103,7 +90,7 @@
try
{
AMQShortString consumerTag = channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck,
- body.arguments, body.noLocal);
+ body.arguments, body.noLocal, body.exclusive);
if (!body.nowait)
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
@@ -143,6 +130,21 @@
AMQConstant.NOT_ALLOWED.getCode(), // replyCode
msg)); // replyText
}
+ catch (AMQQueue.ExistingExclusiveSubscription e)
+ {
+ throw body.getChannelException(AMQConstant.ACCESS_REFUSED.getCode(),
+ "Cannot subscribe to queue "
+ + queue.getName()
+ + " as it already has an existing exclusive consumer");
+ }
+ catch (AMQQueue.ExistingSubscriptionPreventsExclusive e)
+ {
+ throw body.getChannelException(AMQConstant.ACCESS_REFUSED.getCode(),
+ "Cannot subscribe to queue "
+ + queue.getName()
+ + " exclusively as it already has a consumer");
+ }
+
}
}
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java Fri Jan 19 02:35:21 2007
@@ -21,6 +21,7 @@
package org.apache.qpid.server.handler;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQChannelException;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.BasicPublishBody;
@@ -42,7 +43,6 @@
private static final BasicPublishMethodHandler _instance = new BasicPublishMethodHandler();
- private static final AMQShortString UNKNOWN_EXCHANGE_NAME = new AMQShortString("Unknown exchange name");
public static BasicPublishMethodHandler getInstance()
{
@@ -74,19 +74,8 @@
// if the exchange does not exist we raise a channel exception
if (e == null)
{
- protocolSession.closeChannel(evt.getChannelId());
- // TODO: modify code gen to make getClazz and getMethod public methods rather than protected
- // then we can remove the hardcoded 0,0
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- AMQFrame cf = ChannelCloseBody.createAMQFrame(evt.getChannelId(),
- (byte)8, (byte)0, // AMQP version (major, minor)
- ChannelCloseBody.getClazz((byte)8, (byte)0), // classId
- ChannelCloseBody.getMethod((byte)8, (byte)0), // methodId
- 500, // replyCode
- UNKNOWN_EXCHANGE_NAME); // replyText
- protocolSession.writeFrame(cf);
+ throw body.getChannelException(500, "Unknown exchange name");
+
}
else
{
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java Fri Jan 19 02:35:21 2007
@@ -44,6 +44,8 @@
AMQProtocolSession session, AMQMethodEvent<BasicQosBody> evt) throws AMQException
{
session.getChannel(evt.getChannelId()).setPrefetchCount(evt.getMethod().prefetchCount);
+ session.getChannel(evt.getChannelId()).setPrefetchSize(evt.getMethod().prefetchSize);
+
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java Fri Jan 19 02:35:21 2007
@@ -52,6 +52,8 @@
{
throw new AMQException("Unknown channel " + evt.getChannelId());
}
- channel.resend(protocolSession);
+ BasicRecoverBody body = evt.getMethod();
+ channel.resend(protocolSession, body.requeue);
+
}
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java Fri Jan 19 02:35:21 2007
@@ -56,21 +56,22 @@
AMQMethodEvent<ConnectionOpenBody> evt) throws AMQException
{
ConnectionOpenBody body = evt.getMethod();
- AMQShortString contextKey = body.virtualHost;
+
+
//todo //FIXME The virtual host must be validated by the server for the connection to open-ok
// See Spec (0.8.2). Section 3.1.2 Virtual Hosts
- if (contextKey == null)
+ if (protocolSession.getContextKey() == null)
{
- contextKey = generateClientID();
+ protocolSession.setContextKey(generateClientID());
}
- protocolSession.setContextKey(contextKey);
+
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
AMQFrame response = ConnectionOpenOkBody.createAMQFrame((short)0,
(byte)8, (byte)0, // AMQP version (major, minor)
- contextKey); // knownHosts
+ body.virtualHost); // knownHosts
stateManager.changeState(AMQState.CONNECTION_OPEN);
protocolSession.writeFrame(response);
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java Fri Jan 19 02:35:21 2007
@@ -76,7 +76,7 @@
{
if(body.passive && ((body.type == null) || body.type.length() ==0))
{
- throw new AMQChannelException(AMQConstant.NOT_FOUND.getCode(), "Unknown exchange: " + body.exchange,body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor());
+ throw body.getChannelException(AMQConstant.NOT_FOUND.getCode(), "Unknown exchange: " + body.exchange);
}
else
{
@@ -89,7 +89,7 @@
}
catch(AMQUnknownExchangeType e)
{
- throw new AMQConnectionException(AMQConstant.COMMAND_INVALID.getCode(), "Unknown exchange: " + body.exchange,body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor(),e);
+ throw body.getConnectionException(AMQConstant.COMMAND_INVALID.getCode(), "Unknown exchange: " + body.exchange,e);
}
}
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java Fri Jan 19 02:35:21 2007
@@ -77,22 +77,19 @@
{
body.queue = createName();
}
+
+ AMQQueue queue = null;
//TODO: do we need to check that the queue already exists with exactly the same "configuration"?
synchronized (queueRegistry)
{
- AMQQueue queue;
+
if (((queue = queueRegistry.getQueue(body.queue)) == null) )
{
if(body.passive)
{
String msg = "Queue: " + body.queue + " not found.";
- throw new AMQChannelException(AMQConstant.NOT_FOUND.getCode(),
- msg,
- body.getClazz(),
- body.getMethod(),
- (byte)8,
- (byte)0 );
+ throw body.getChannelException(AMQConstant.NOT_FOUND.getCode(),msg );
}
else
@@ -112,9 +109,16 @@
}
}
}
+ else if(queue.getOwner() != null && !protocolSession.getContextKey().equals(queue.getOwner()))
+ {
+ // todo - constant
+ throw body.getChannelException(405, "Cannot declare queue, as exclusive queue with same name declared on another connection");
+
+ }
//set this as the default queue on the channel:
protocolSession.getChannel(evt.getChannelId()).setDefaultQueue(queue);
}
+
if (!body.nowait)
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
@@ -122,8 +126,8 @@
// Be aware of possible changes to parameter order as versions change.
AMQFrame response = QueueDeclareOkBody.createAMQFrame(evt.getChannelId(),
(byte)8, (byte)0, // AMQP version (major, minor)
- 0L, // consumerCount
- 0L, // messageCount
+ queue.getConsumerCount(), // consumerCount
+ queue.getMessageCount(), // messageCount
body.queue); // queue
_log.info("Queue " + body.queue + " declared successfully");
protocolSession.writeFrame(response);
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java Fri Jan 19 02:35:21 2007
@@ -34,6 +34,7 @@
import org.apache.qpid.framing.ChannelCloseBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQChannelException;
import org.apache.qpid.protocol.AMQConstant;
public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteBody>
@@ -84,15 +85,12 @@
{
if(body.ifEmpty && !queue.isEmpty())
{
- AMQShortString msg = new AMQShortString("Queue: " + body.queue + " is not empty.");
- // TODO - Error code
- session.writeFrame(ChannelCloseBody.createAMQFrame(evt.getChannelId(),(byte)8, (byte)0, body.getClazz(), body.getMethod(), 406, msg ));
+ throw body.getChannelException(406, "Queue: " + body.queue + " is not empty." );
}
else if(body.ifUnused && !queue.isUnused())
- {
- AMQShortString msg = new AMQShortString("Queue: " + body.queue + " is still used.");
+ {
// TODO - Error code
- session.writeFrame(ChannelCloseBody.createAMQFrame(evt.getChannelId(),(byte)8, (byte)0, body.getClazz(), body.getMethod(), 406, msg ));
+ throw body.getChannelException(406, "Queue: " + body.queue + " is still used." );
}
else
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java Fri Jan 19 02:35:21 2007
@@ -57,7 +57,7 @@
protocolSession.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0));
//Now resend all the unacknowledged messages back to the original subscribers.
//(Must be done after the TxnRollback-ok response).
- channel.resend(protocolSession);
+ channel.resend(protocolSession, false);
}catch(AMQException e){
throw evt.getMethod().getChannelException(e.getErrorCode(), "Failed to rollback: " + e.getMessage());
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Fri Jan 19 02:35:21 2007
@@ -27,6 +27,7 @@
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQConnectionException;
+import org.apache.qpid.common.ClientProperties;
import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
@@ -57,6 +58,9 @@
{
private static final Logger _logger = Logger.getLogger(AMQProtocolSession.class);
+ private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString();
+
+
private final IoSession _minaProtocolSession;
private AMQShortString _contextKey;
@@ -218,31 +222,36 @@
(AMQMethodBody) frame.bodyFrame);
try
{
- boolean wasAnyoneInterested = _stateManager.methodReceived(evt);
-
- if(!_frameListeners.isEmpty())
+ try
{
- for (AMQMethodListener listener : _frameListeners)
+ boolean wasAnyoneInterested = _stateManager.methodReceived(evt);
+
+ if(!_frameListeners.isEmpty())
{
- wasAnyoneInterested = listener.methodReceived(evt) ||
- wasAnyoneInterested;
+ for (AMQMethodListener listener : _frameListeners)
+ {
+ wasAnyoneInterested = listener.methodReceived(evt) ||
+ wasAnyoneInterested;
+ }
+ }
+ if (!wasAnyoneInterested)
+ {
+ throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener.");
}
}
- if (!wasAnyoneInterested)
+ catch (AMQChannelException e)
{
- throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener.");
+ _logger.error("Closing channel due to: " + e.getMessage());
+ writeFrame(e.getCloseFrame(frame.channel));
+ closeChannel(frame.channel);
+ }
+ catch (AMQConnectionException e)
+ {
+ _logger.error("Closing connection due to: " + e.getMessage());
+ closeSession();
+ writeFrame(e.getCloseFrame(frame.channel));
}
}
- catch (AMQChannelException e)
- {
- _logger.error("Closing channel due to: " + e.getMessage());
- writeFrame(e.getCloseFrame(frame.channel));
- }
- catch (AMQConnectionException e)
- {
- _logger.error("Closing connection due to: " + e.getMessage());
- writeFrame(e.getCloseFrame(frame.channel));
- }
catch (Exception e)
{
_stateManager.error(e);
@@ -516,6 +525,10 @@
public void setClientProperties(FieldTable clientProperties)
{
_clientProperties = clientProperties;
+ if((_clientProperties != null) && (_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE) != null))
+ {
+ setContextKey(new AMQShortString(_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE)));
+ }
}
/**
@@ -536,5 +549,11 @@
public boolean amqpVersionEquals(byte major, byte minor)
{
return _major == major && _minor == minor;
+ }
+
+
+ public Object getClientIdentifier()
+ {
+ return _minaProtocolSession.getRemoteAddress();
}
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java Fri Jan 19 02:35:21 2007
@@ -124,4 +124,6 @@
FieldTable getClientProperties();
void setClientProperties(FieldTable clientProperties);
+
+ Object getClientIdentifier();
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java Fri Jan 19 02:35:21 2007
@@ -33,6 +33,8 @@
define(registry, factory, ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS);
define(registry, factory, ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
define(registry, factory, ExchangeDefaults.FANOUT_EXCHANGE_NAME, ExchangeDefaults.FANOUT_EXCHANGE_CLASS);
+
+ registry.setDefaultExchange(registry.getExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME));
}
private void define(ExchangeRegistry r, ExchangeFactory f,
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Fri Jan 19 02:35:21 2007
@@ -43,8 +43,6 @@
{
private static final Logger _log = Logger.getLogger(AMQMessage.class);
- public static final String JMS_MESSAGE = "jms.message";
-
/**
* Used in clustering
*/
@@ -75,6 +73,8 @@
private TransientMessageData _transientMessageData = new TransientMessageData();
+
+
/**
* Used to iterate through all the body frames associated with this message. Will not
* keep all the data in memory therefore is memory-efficient.
@@ -550,6 +550,7 @@
{
SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
contentHeader);
+
protocolSession.writeFrame(compositeBlock);
}
else
@@ -582,6 +583,50 @@
}
+ public void writeGetOk(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize) throws AMQException
+ {
+ ByteBuffer deliver = createEncodedGetOkFrame(channelId, deliveryTag, queueSize);
+ AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+ getContentHeaderBody());
+
+ final int bodyCount = _messageHandle.getBodyCount(_messageId);
+ if(bodyCount == 0)
+ {
+ SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
+ contentHeader);
+ protocolSession.writeFrame(compositeBlock);
+ }
+ else
+ {
+
+
+ //
+ // Optimise the case where we have a single content body. In that case we create a composite block
+ // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
+ //
+ ContentBody cb = _messageHandle.getContentBody(_messageId, 0);
+
+ AMQDataBlock firstContentBody = ContentBody.createAMQFrame(channelId, cb);
+ AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
+ protocolSession.writeFrame(compositeBlock);
+
+ //
+ // Now start writing out the other content bodies
+ //
+ for(int i = 1; i < bodyCount; i++)
+ {
+ cb = _messageHandle.getContentBody(_messageId, i);
+ protocolSession.writeFrame(ContentBody.createAMQFrame(channelId, cb));
+ }
+
+
+ }
+
+
+ }
+
+
private ByteBuffer createEncodedDeliverFrame(int channelId, long deliveryTag, AMQShortString consumerTag)
throws AMQException
{
@@ -595,6 +640,21 @@
return buf;
}
+ private ByteBuffer createEncodedGetOkFrame(int channelId, long deliveryTag, int queueSize)
+ throws AMQException
+ {
+ BasicPublishBody pb = getPublishBody();
+ AMQFrame getOkFrame = BasicGetOkBody.createAMQFrame(channelId, (byte) 8, (byte) 0,
+ deliveryTag, pb.exchange,
+ queueSize,
+ _messageHandle.isRedelivered(),
+ pb.routingKey);
+ ByteBuffer buf = ByteBuffer.allocate((int) getOkFrame.getSize()); // XXX: Could cast be a problem?
+ getOkFrame.writePayload(buf);
+ buf.flip();
+ return buf;
+ }
+
private ByteBuffer createEncodedReturnFrame(int channelId, int replyCode, AMQShortString replyText) throws AMQException
{
AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId, (byte) 8, (byte) 0, getPublishBody().exchange,
@@ -642,6 +702,24 @@
protocolSession.writeFrame(bodyFrameIterator.next());
}
}
+
+
+ public long getSize()
+ {
+ try
+ {
+ long size = getContentHeaderBody().bodySize;
+
+ return size;
+ }
+ catch (AMQException e)
+ {
+ _log.error(e);
+ return 0;
+ }
+
+ }
+
public String toString()
{
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Fri Jan 19 02:35:21 2007
@@ -29,11 +29,14 @@
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.AMQChannel;
import javax.management.JMException;
import java.text.MessageFormat;
import java.util.List;
import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* This is an AMQ Queue, and should not be confused with a JMS queue or any other abstraction like
@@ -41,6 +44,30 @@
*/
public class AMQQueue implements Managable, Comparable
{
+
+ public static final class ExistingExclusiveSubscription extends AMQException
+ {
+
+ public ExistingExclusiveSubscription()
+ {
+ super("");
+ }
+ }
+
+ public static final class ExistingSubscriptionPreventsExclusive extends AMQException
+ {
+
+ public ExistingSubscriptionPreventsExclusive()
+ {
+ super("");
+ }
+ }
+
+ private static final ExistingExclusiveSubscription EXISTING_EXCLUSIVE = new ExistingExclusiveSubscription();
+ private static final ExistingSubscriptionPreventsExclusive EXISTING_SUBSCRIPTION = new ExistingSubscriptionPreventsExclusive();
+
+
+
private static final Logger _logger = Logger.getLogger(AMQQueue.class);
private final AMQShortString _name;
@@ -64,6 +91,11 @@
private final SubscriptionFactory _subscriptionFactory;
+ private final AtomicInteger _subscriberCount = new AtomicInteger();
+
+ private final AtomicBoolean _isExclusive = new AtomicBoolean();
+
+
/**
* Manages message delivery.
*/
@@ -187,31 +219,7 @@
_managedObject.register();
_subscribers = subscribers;
_subscriptionFactory = subscriptionFactory;
-
- //fixme - Make this configurable via the broker config.xml
- if (System.getProperties().getProperty("deliverymanager") != null)
- {
- if (System.getProperties().getProperty("deliverymanager").equals("ConcurrentSelectorDeliveryManager"))
- {
- _logger.info("Using ConcurrentSelectorDeliveryManager");
- _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
- }
- else if (System.getProperties().getProperty("deliverymanager").equals("ConcurrentDeliveryManager"))
- {
- _logger.info("Using ConcurrentDeliveryManager");
- _deliveryMgr = new ConcurrentDeliveryManager(_subscribers, this);
- }
- else
- {
- _logger.info("Using SynchronizedDeliveryManager");
- _deliveryMgr = new SynchronizedDeliveryManager(_subscribers, this);
- }
- }
- else
- {
- _logger.info("Using Default DeliveryManager: ConcurrentSelectorDeliveryManager");
- _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
- }
+ _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
}
private AMQQueueMBean createMBean() throws AMQException
@@ -352,9 +360,9 @@
/**
* removes all the messages from the queue.
*/
- public void clearQueue(StoreContext storeContext) throws AMQException
+ public long clearQueue(StoreContext storeContext) throws AMQException
{
- _deliveryMgr.clearAllMessages(storeContext);
+ return _deliveryMgr.clearAllMessages(storeContext);
}
public void bind(AMQShortString routingKey, Exchange exchange)
@@ -362,14 +370,30 @@
_bindings.addBinding(routingKey, exchange);
}
- public void registerProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag, boolean acks, FieldTable filters) throws AMQException
- {
- registerProtocolSession(ps, channel, consumerTag, acks, filters, false);
- }
- public void registerProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag, boolean acks, FieldTable filters, boolean noLocal)
+ public void registerProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag, boolean acks,
+ FieldTable filters, boolean noLocal, boolean exclusive)
throws AMQException
{
+ if(incrementSubscriberCount() > 1)
+ {
+ if(isExclusive())
+ {
+ decrementSubscriberCount();
+ throw EXISTING_EXCLUSIVE;
+ }
+ else if(exclusive)
+ {
+ decrementSubscriberCount();
+ throw EXISTING_SUBSCRIPTION;
+ }
+
+ }
+ else if(exclusive)
+ {
+ setExclusive(true);
+ }
+
debug("Registering protocol session {0} with channel {1} and consumer tag {2} with {3}", ps, channel, consumerTag, this);
Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal);
@@ -385,6 +409,28 @@
_subscribers.addSubscriber(subscription);
}
+
+ private boolean isExclusive()
+ {
+ return _isExclusive.get();
+ }
+
+ private void setExclusive(boolean exclusive)
+ {
+ _isExclusive.set(exclusive);
+ }
+
+ private int incrementSubscriberCount()
+ {
+ return _subscriberCount.incrementAndGet();
+ }
+
+ private int decrementSubscriberCount()
+ {
+ return _subscriberCount.decrementAndGet();
+ }
+
+
public void unregisterProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag) throws AMQException
{
debug("Unregistering protocol session {0} with channel {1} and consumer tag {2} from {3}", ps, channel, consumerTag,
@@ -400,6 +446,10 @@
" and protocol session key " + ps.getKey() + " not registered with queue " + this);
}
+ setExclusive(false);
+ decrementSubscriberCount();
+
+
// if we are eligible for auto deletion, unregister from the queue registry
if (_autoDelete && _subscribers.isEmpty())
{
@@ -454,6 +504,23 @@
delete();
}
+ public void processGet(StoreContext storeContext, AMQMessage msg) throws AMQException
+ {
+ _deliveryMgr.deliver(storeContext, getName(), msg);
+ try
+ {
+ msg.checkDeliveredToConsumer();
+ updateReceivedMessageCount(msg);
+ }
+ catch (NoConsumersException e)
+ {
+ // as this message will be returned, it should be removed
+ // from the queue:
+ dequeue(storeContext, msg);
+ }
+ }
+
+
public void process(StoreContext storeContext, AMQMessage msg) throws AMQException
{
_deliveryMgr.deliver(storeContext, getName(), msg);
@@ -547,4 +614,12 @@
_logger.debug(MessageFormat.format(msg, args));
}
}
+
+ public boolean performGet(AMQProtocolSession session, AMQChannel channel, boolean acks) throws AMQException
+ {
+ return _deliveryMgr.performGet(session, channel, acks);
+ }
+
+
+
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Fri Jan 19 02:35:21 2007
@@ -28,6 +28,8 @@
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.configuration.Configurator;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
import java.util.ArrayList;
import java.util.Iterator;
@@ -52,6 +54,9 @@
* Holds any queued messages
*/
private final Queue<AMQMessage> _messages = new ConcurrentLinkedQueueAtomicSize<AMQMessage>();
+
+ private final ReentrantLock _messageAccessLock = new ReentrantLock();
+
//private int _messageCount;
/**
* Ensures that only one asynchronous task is running for this manager at
@@ -169,6 +174,56 @@
}
}
+ public boolean performGet(AMQProtocolSession protocolSession, AMQChannel channel, boolean acks) throws AMQException
+ {
+ AMQMessage msg = getNextMessage();
+ if(msg == null)
+ {
+ return false;
+ }
+ else
+ {
+
+ try
+ {
+ // if we do not need to wait for client acknowledgements
+ // we can decrement the reference count immediately.
+
+ // By doing this _before_ the send we ensure that it
+ // doesn't get sent if it can't be dequeued, preventing
+ // duplicate delivery on recovery.
+
+ // The send may of course still fail, in which case, as
+ // the message is unacked, it will be lost.
+ if (!acks)
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("No ack mode so dequeuing message immediately: " + msg.getMessageId());
+ }
+ _queue.dequeue(channel.getStoreContext(), msg);
+ }
+ synchronized(channel)
+ {
+ long deliveryTag = channel.getNextDeliveryTag();
+
+ if (acks)
+ {
+ channel.addUnacknowledgedMessage(msg, deliveryTag, null, _queue);
+ }
+
+ msg.writeGetOk(protocolSession, channel.getChannelId(), deliveryTag, _queue.getMessageCount());
+ }
+ }
+ finally
+ {
+ msg.setDeliveredToConsumer();
+ }
+ return true;
+
+ }
+ }
+
public synchronized void removeAMessageFromTop(StoreContext storeContext) throws AMQException
{
AMQMessage msg = poll();
@@ -178,22 +233,35 @@
}
}
- public synchronized void clearAllMessages(StoreContext storeContext) throws AMQException
+ public synchronized long clearAllMessages(StoreContext storeContext) throws AMQException
{
+ long count = 0;
AMQMessage msg = poll();
while (msg != null)
{
msg.dequeue(storeContext, _queue);
+ count++;
msg = poll();
}
+ return count;
+ }
+
+ public synchronized AMQMessage getNextMessage() throws AMQException
+ {
+ return getNextMessage(_messages);
}
- private AMQMessage getNextMessage(Queue<AMQMessage> messages, Subscription sub)
+ private AMQMessage getNextMessage(Queue<AMQMessage> messages)
+ {
+ return getNextMessage(messages, false);
+ }
+
+ private AMQMessage getNextMessage(Queue<AMQMessage> messages, boolean browsing)
{
AMQMessage message = messages.peek();
- while (message != null && (sub.isBrowser() || message.taken()))
+ while (message != null && (browsing || message.taken()))
{
//remove the already taken message
messages.poll();
@@ -208,7 +276,7 @@
AMQMessage message = null;
try
{
- message = getNextMessage(messageQueue, sub);
+ message = getNextMessage(messageQueue, sub.isBrowser());
// message will be null if we have no messages in the messageQueue.
if (message == null)
@@ -287,6 +355,7 @@
{
_log.debug(id() + "deliver :" + msg);
}
+ msg.release();
//Check if we have someone to deliver the message to.
_lock.lock();
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java Fri Jan 19 02:35:21 2007
@@ -23,6 +23,8 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
import java.util.concurrent.Executor;
import java.util.List;
@@ -72,9 +74,11 @@
void removeAMessageFromTop(StoreContext storeContext) throws AMQException;
- void clearAllMessages(StoreContext storeContext) throws AMQException;
+ long clearAllMessages(StoreContext storeContext) throws AMQException;
List<AMQMessage> getMessages();
void populatePreDeliveryQueue(Subscription subscription);
+
+ boolean performGet(AMQProtocolSession session, AMQChannel channel, boolean acks) throws AMQException;
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java Fri Jan 19 02:35:21 2007
@@ -45,4 +45,6 @@
void close();
boolean isBrowser();
+
+ boolean wouldSuspend(AMQMessage msg);
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java Fri Jan 19 02:35:21 2007
@@ -66,6 +66,7 @@
private final boolean _isBrowser;
private final Boolean _autoClose;
private boolean _closed = false;
+ private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString();
public static class Factory implements SubscriptionFactory
{
@@ -300,37 +301,54 @@
{
if (_noLocal)
{
+ boolean isLocal;
// We don't want local messages so check to see if message is one we sent
- Object localInstance = protocolSession.getClientProperties().getObject(ClientProperties.instance.toString());
- Object msgInstance = msg.getPublisher().getClientProperties().getObject(ClientProperties.instance.toString());
+ Object localInstance;
+ Object msgInstance;
- if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance)))
+ if((protocolSession.getClientProperties() != null) &&
+ (localInstance = protocolSession.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
{
- if (_logger.isTraceEnabled())
+ if((msg.getPublisher().getClientProperties() != null) &&
+ (msgInstance = msg.getPublisher().getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
{
- _logger.trace("(" + System.identityHashCode(this) + ") has no interest as it is a local message(" +
- System.identityHashCode(msg) + ")");
+ if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance)))
+ {
+ if (_logger.isTraceEnabled())
+ {
+ _logger.trace("(" + System.identityHashCode(this) + ") has no interest as it is a local message(" +
+ System.identityHashCode(msg) + ")");
+ }
+ return false;
+ }
}
- return false;
}
- else // if not then filter the message.
+ else
{
- if (_logger.isTraceEnabled())
+ localInstance = protocolSession.getClientIdentifier();
+ msgInstance = msg.getPublisher().getClientIdentifier();
+ if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance)))
{
- _logger.trace("(" + System.identityHashCode(this) + ") local message(" + System.identityHashCode(msg) +
- ") but not ours so filtering");
+ if (_logger.isTraceEnabled())
+ {
+ _logger.trace("(" + System.identityHashCode(this) + ") has no interest as it is a local message(" +
+ System.identityHashCode(msg) + ")");
+ }
+ return false;
}
- return checkFilters(msg);
+
}
+
+
}
- else
+
+
+ if (_logger.isTraceEnabled())
{
- if (_logger.isTraceEnabled())
- {
- _logger.trace("(" + System.identityHashCode(this) + ") checking filters for message (" + System.identityHashCode(msg));
- }
- return checkFilters(msg);
+ _logger.trace("(" + System.identityHashCode(this) + ") checking filters for message (" + System.identityHashCode(msg));
}
+ return checkFilters(msg);
+
}
private boolean checkFilters(AMQMessage msg)
@@ -391,6 +409,11 @@
public boolean isBrowser()
{
return _isBrowser;
+ }
+
+ public boolean wouldSuspend(AMQMessage msg)
+ {
+ return channel.wouldSuspend(msg);
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java Fri Jan 19 02:35:21 2007
@@ -137,7 +137,7 @@
++_currentSubscriber;
subscriberScanned();
- if (!subscription.isSuspended())
+ if (!(subscription.isSuspended() || subscription.wouldSuspend(msg)))
{
if (subscription.hasInterest(msg))
{
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java Fri Jan 19 02:35:21 2007
@@ -21,6 +21,8 @@
package org.apache.qpid.server.state;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQConnectionException;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.framing.*;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.handler.*;
@@ -28,6 +30,7 @@
import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.AMQChannel;
import org.apache.log4j.Logger;
import java.util.HashMap;
@@ -118,12 +121,14 @@
frame2handlerMap.put(BasicAckBody.class, BasicAckMethodHandler.getInstance());
frame2handlerMap.put(BasicRecoverBody.class, BasicRecoverMethodHandler.getInstance());
frame2handlerMap.put(BasicConsumeBody.class, BasicConsumeMethodHandler.getInstance());
+ frame2handlerMap.put(BasicGetBody.class, BasicGetMethodHandler.getInstance());
frame2handlerMap.put(BasicCancelBody.class, BasicCancelMethodHandler.getInstance());
frame2handlerMap.put(BasicPublishBody.class, BasicPublishMethodHandler.getInstance());
frame2handlerMap.put(BasicQosBody.class, BasicQosHandler.getInstance());
frame2handlerMap.put(QueueBindBody.class, QueueBindHandler.getInstance());
frame2handlerMap.put(QueueDeclareBody.class, QueueDeclareHandler.getInstance());
frame2handlerMap.put(QueueDeleteBody.class, QueueDeleteHandler.getInstance());
+ frame2handlerMap.put(QueuePurgeBody.class, QueuePurgeHandler.getInstance());
frame2handlerMap.put(ChannelFlowBody.class, ChannelFlowHandler.getInstance());
frame2handlerMap.put(TxSelectBody.class, TxSelectHandler.getInstance());
frame2handlerMap.put(TxCommitBody.class, TxCommitHandler.getInstance());
@@ -168,10 +173,24 @@
StateAwareMethodListener<B> handler = findStateTransitionHandler(_currentState, evt.getMethod());
if (handler != null)
{
+
+ checkChannel(evt, _protocolSession);
+
handler.methodReceived(this, _queueRegistry, _exchangeRegistry, _protocolSession, evt);
return true;
}
return false;
+ }
+
+ private <B extends AMQMethodBody> void checkChannel(AMQMethodEvent<B> evt, AMQProtocolSession protocolSession)
+ throws AMQException
+ {
+ if(evt.getChannelId() != 0
+ && !(evt.getMethod() instanceof ChannelOpenBody)
+ && protocolSession.getChannel(evt.getChannelId()) == null)
+ {
+ throw evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR.getCode(),"No such channel: " + evt.getChannelId());
+ }
}
protected <B extends AMQMethodBody> StateAwareMethodListener<B> findStateTransitionHandler(AMQState currentState,
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Fri Jan 19 02:35:21 2007
@@ -269,14 +269,15 @@
private void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException
{
+ byte[] url = jmsMsg.getBytesProperty(CustomJMSXProperty.JMSX_QPID_JMSDESTINATIONURL.getShortStringName());
+ Destination dest = AMQDestination.createDestination(url);
+ jmsMsg.setJMSDestination(dest);
+
if (_session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
{
_unacknowledgedDeliveryTags.add(jmsMsg.getDeliveryTag());
- byte[] url = jmsMsg.getBytesProperty(CustomJMSXProperty.JMSX_QPID_JMSDESTINATIONURL.getShortStringName());
- Destination dest = AMQDestination.createDestination(url);
- jmsMsg.setJMSDestination(dest);
-
}
+
_session.setInRecovery(false);
}
Modified: incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java Fri Jan 19 02:35:21 2007
@@ -134,11 +134,12 @@
public boolean isBrowser()
{
- return false; //To change body of implemented methods use File | Settings | File Templates.
+ return false;
}
- public void sendNextMessage(AMQQueue queue)
+ public boolean wouldSuspend(AMQMessage msg)
{
-
+ return _suspended;
}
+
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java Fri Jan 19 02:35:21 2007
@@ -32,6 +32,7 @@
/* AMQP version for which exception ocurred */
private final byte major;
private final byte minor;
+ boolean _closeConnetion;
public AMQConnectionException(int errorCode, String msg, int classId, int methodId, byte major, byte minor, Throwable t)
{
@@ -51,9 +52,12 @@
this.minor = minor;
}
+
+
public AMQFrame getCloseFrame(int channel)
{
return ConnectionCloseBody.createAMQFrame(channel, major, minor, _classId, _methodId, getErrorCode(), new AMQShortString(getMessage()));
}
+
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java Fri Jan 19 02:35:21 2007
@@ -22,6 +22,7 @@
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQChannelException;
+import org.apache.qpid.AMQConnectionException;
public abstract class AMQMethodBody extends AMQBody
{
@@ -101,4 +102,17 @@
{
return new AMQChannelException(code, message, getClazz(), getMethod(), major, minor, cause);
}
+
+ public AMQConnectionException getConnectionException(int code, String message)
+ {
+ return new AMQConnectionException(code, message, getClazz(), getMethod(), major, minor);
+ }
+
+
+
+ public AMQConnectionException getConnectionException(int code, String message, Throwable cause)
+ {
+ return new AMQConnectionException(code, message, getClazz(), getMethod(), major, minor, cause);
+ }
+
}
Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java Fri Jan 19 02:35:21 2007
@@ -78,7 +78,7 @@
_protocolSession = new MockProtocolSession(_messageStore);
_protocolSession.addChannel(_channel);
- _queue.registerProtocolSession(_protocolSession, 1, new AMQShortString("test"), false, null);
+ _queue.registerProtocolSession(_protocolSession, 1, new AMQShortString("test"), false, null,false,false);
assertTrue(_queueMBean.getActiveConsumerCount() == 1);
SubscriptionSet _subscribers = (SubscriptionSet) mgr;
Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java Fri Jan 19 02:35:21 2007
@@ -53,7 +53,7 @@
public ConcurrencyTest() throws Exception
{
- _deliveryMgr = new ConcurrentDeliveryManager(_subscriptionMgr, new AMQQueue(new AMQShortString("myQ"), false, new AMQShortString("guest"), false,
+ _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscriptionMgr, new AMQQueue(new AMQShortString("myQ"), false, new AMQShortString("guest"), false,
new DefaultQueueRegistry()));
}
Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java Fri Jan 19 02:35:21 2007
@@ -172,8 +172,6 @@
public static junit.framework.Test suite()
{
TestSuite suite = new TestSuite();
- suite.addTestSuite(ConcurrentDeliveryManagerTest.class);
- suite.addTestSuite(SynchronizedDeliveryManagerTest.class);
return suite;
}
}
Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java Fri Jan 19 02:35:21 2007
@@ -132,4 +132,9 @@
public void setClientProperties(FieldTable clientProperties)
{
}
+
+ public Object getClientIdentifier()
+ {
+ return null;
+ }
}
Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java?view=diff&rev=497770&r1=497769&r2=497770
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java Fri Jan 19 02:35:21 2007
@@ -67,6 +67,12 @@
return isSuspended;
}
+ public boolean wouldSuspend(AMQMessage msg)
+ {
+ return isSuspended;
+ }
+
+
public void queueDeleted(AMQQueue queue)
{
}