You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2008/02/28 13:18:14 UTC
svn commit: r631938 [2/5] - in /incubator/qpid/branches/thegreatmerge/qpid:
./ java/ java/broker/ java/broker/src/main/java/org/apache/qpid/server/
java/broker/src/main/java/org/apache/qpid/server/exchange/
java/broker/src/main/java/org/apache/qpid/ser...
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Thu Feb 28 04:16:41 2008
@@ -208,27 +208,10 @@
{
_logger.debug("Frame Received: " + frame);
}
+
+
+ body.handle(channelId, this);
- if (body instanceof AMQMethodBody)
- {
- methodFrameReceived(channelId, (AMQMethodBody) body);
- }
- else if (body instanceof ContentHeaderBody)
- {
- contentHeaderReceived(channelId, (ContentHeaderBody) body);
- }
- else if (body instanceof ContentBody)
- {
- contentBodyReceived(channelId, (ContentBody) body);
- }
- else if (body instanceof HeartbeatBody)
- {
- // NO OP
- }
- else
- {
- _logger.warn("Unrecognised frame " + frame.getClass().getName());
- }
}
private void protocolInitiationReceived(ProtocolInitiation pi)
@@ -271,7 +254,7 @@
}
}
- private void methodFrameReceived(int channelId, AMQMethodBody methodBody)
+ public void methodFrameReceived(int channelId, AMQMethodBody methodBody)
{
final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(channelId, methodBody);
@@ -365,7 +348,7 @@
}
catch (Exception e)
{
- _stateManager.error(e);
+
for (AMQMethodListener listener : _frameListeners)
{
listener.error(e);
@@ -375,7 +358,7 @@
}
}
- private void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException
+ public void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException
{
AMQChannel channel = getAndAssertChannel(channelId);
@@ -384,11 +367,16 @@
}
- private void contentBodyReceived(int channelId, ContentBody body) throws AMQException
+ public void contentBodyReceived(int channelId, ContentBody body) throws AMQException
{
AMQChannel channel = getAndAssertChannel(channelId);
channel.publishContentBody(body, this);
+ }
+
+ public void heartbeatBodyReceived(int channelId, HeartbeatBody body)
+ {
+ // NO - OP
}
/**
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java Thu Feb 28 04:16:41 2008
@@ -32,6 +32,7 @@
import org.apache.mina.filter.SSLFilter;
import org.apache.mina.filter.WriteBufferLimitFilterBuilder;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
+import org.apache.mina.filter.codec.QpidProtocolCodecFilter;
import org.apache.mina.filter.executor.ExecutorFilter;
import org.apache.mina.transport.socket.nio.SocketSessionConfig;
import org.apache.mina.util.SessionUtil;
@@ -83,7 +84,7 @@
createSession(protocolSession, _applicationRegistry, codecFactory);
_logger.info("Protocol session created for:" + protocolSession.getRemoteAddress());
- final ProtocolCodecFilter pcf = new ProtocolCodecFilter(codecFactory);
+ final QpidProtocolCodecFilter pcf = new QpidProtocolCodecFilter(codecFactory);
ConnectorConfiguration connectorConfig = ApplicationRegistry.getInstance().
getConfiguredObject(ConnectorConfiguration.class);
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Thu Feb 28 04:16:41 2008
@@ -102,7 +102,7 @@
private long _expiration;
- private final int hashcode = System.identityHashCode(this);
+
private Exchange _exchange;
private static final boolean SYNCED_CLOCKS =
@@ -111,7 +111,7 @@
public String debugIdentity()
{
- return "(HC:" + hashcode + " ID:" + _messageId + " Ref:" + _referenceCount.get() + ")";
+ return "(HC:" + System.identityHashCode(this) + " ID:" + _messageId + " Ref:" + _referenceCount.get() + ")";
}
public void setExpiration()
@@ -160,6 +160,11 @@
_exchange.route(this);
}
+ public void enqueue(final List<AMQQueue> queues)
+ {
+ _transientMessageData.setDestinationQueues(queues);
+ }
+
/**
* Used to iterate through all the body frames associated with this message. Will not keep all the data in memory
* therefore is memory-efficient.
@@ -763,7 +768,7 @@
}
finally
{
- destinationQueues.clear();
+
// Remove refence for routing process . Reference count should now == delivered queue count
decrementReference(storeContext);
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Thu Feb 28 04:16:41 2008
@@ -39,11 +39,7 @@
import javax.management.JMException;
import java.text.MessageFormat;
-import java.util.Collection;
-import java.util.Hashtable;
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
+import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -149,6 +145,8 @@
public AtomicLong _totalMessagesReceived = new AtomicLong();
+ private final Set<NotificationCheck> _notificationChecks = EnumSet.noneOf(NotificationCheck.class);
+
public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
throws AMQException
@@ -192,6 +190,13 @@
_subscriptionFactory = subscriptionFactory;
_deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
_queueId = s_queueID++;
+
+ // This ensure that the notification checks for the configured alerts are created.
+ setMaximumMessageAge(_maximumMessageAge);
+ setMaximumMessageCount(_maximumMessageCount);
+ setMaximumMessageSize(_maximumMessageSize);
+ setMaximumQueueDepth(_maximumQueueDepth);
+
}
private AMQQueueMBean createMBean() throws AMQException
@@ -206,7 +211,7 @@
}
}
- public AMQShortString getName()
+ public final AMQShortString getName()
{
return _name;
}
@@ -539,9 +544,17 @@
return _maximumMessageSize;
}
- public void setMaximumMessageSize(long value)
+ public void setMaximumMessageSize(final long maximumMessageSize)
{
- _maximumMessageSize = value;
+ _maximumMessageSize = maximumMessageSize;
+ if(maximumMessageSize == 0L)
+ {
+ _notificationChecks.remove(NotificationCheck.MESSAGE_SIZE_ALERT);
+ }
+ else
+ {
+ _notificationChecks.add(NotificationCheck.MESSAGE_SIZE_ALERT);
+ }
}
public int getConsumerCount()
@@ -564,9 +577,20 @@
return _maximumMessageCount;
}
- public void setMaximumMessageCount(long value)
+ public void setMaximumMessageCount(final long maximumMessageCount)
{
- _maximumMessageCount = value;
+ _maximumMessageCount = maximumMessageCount;
+ if(maximumMessageCount == 0L)
+ {
+ _notificationChecks.remove(NotificationCheck.MESSAGE_COUNT_ALERT);
+ }
+ else
+ {
+ _notificationChecks.add(NotificationCheck.MESSAGE_COUNT_ALERT);
+ }
+
+
+
}
public long getMaximumQueueDepth()
@@ -575,9 +599,18 @@
}
// Sets the queue depth, the max queue size
- public void setMaximumQueueDepth(long value)
+ public void setMaximumQueueDepth(final long maximumQueueDepth)
{
- _maximumQueueDepth = value;
+ _maximumQueueDepth = maximumQueueDepth;
+ if(maximumQueueDepth == 0L)
+ {
+ _notificationChecks.remove(NotificationCheck.QUEUE_DEPTH_ALERT);
+ }
+ else
+ {
+ _notificationChecks.add(NotificationCheck.QUEUE_DEPTH_ALERT);
+ }
+
}
public long getOldestMessageArrivalTime()
@@ -682,6 +715,10 @@
}
_subscribers.addSubscriber(subscription);
+ if(exclusive)
+ {
+ _subscribers.setExclusive(true);
+ }
}
private boolean isExclusive()
@@ -713,6 +750,7 @@
consumerTag, this));
}
+ _subscribers.setExclusive(false);
Subscription removedSubscription;
if ((removedSubscription =
@@ -827,7 +865,7 @@
public void process(StoreContext storeContext, QueueEntry entry, boolean deliverFirst) throws AMQException
{
AMQMessage msg = entry.getMessage();
- _deliveryMgr.deliver(storeContext, getName(), entry, deliverFirst);
+ _deliveryMgr.deliver(storeContext, _name, entry, deliverFirst);
try
{
msg.checkDeliveredToConsumer();
@@ -960,6 +998,14 @@
public void setMaximumMessageAge(long maximumMessageAge)
{
_maximumMessageAge = maximumMessageAge;
+ if(maximumMessageAge == 0L)
+ {
+ _notificationChecks.remove(NotificationCheck.MESSAGE_AGE_ALERT);
+ }
+ else
+ {
+ _notificationChecks.add(NotificationCheck.MESSAGE_AGE_ALERT);
+ }
}
public void subscriberHasPendingResend(boolean hasContent, SubscriptionImpl subscription, QueueEntry entry)
@@ -1040,5 +1086,10 @@
_deliveryMgr.removeExpired();
}
}
+ }
+
+ public final Set<NotificationCheck> getNotificationChecks()
+ {
+ return _notificationChecks;
}
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java Thu Feb 28 04:16:41 2008
@@ -54,10 +54,7 @@
import javax.management.openmbean.TabularType;
import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.Iterator;
-import java.util.List;
+import java.util.*;
/**
* MBean class for AMQQueue. It implements all the management features exposed
@@ -97,6 +94,9 @@
private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length];
private Notification _lastNotification = null;
+
+
+
@MBeanConstructor("Creates an MBean exposing an AMQQueue")
public AMQQueueMBean(AMQQueue queue) throws JMException
{
@@ -249,16 +249,21 @@
public void checkForNotification(AMQMessage msg) throws AMQException, JMException
{
- final long currentTime = System.currentTimeMillis();
- final long thresholdTime = currentTime - _queue.getMinimumAlertRepeatGap();
+ final Set<NotificationCheck> notificationChecks = _queue.getNotificationChecks();
- for (NotificationCheck check : NotificationCheck.values())
+ if(!notificationChecks.isEmpty())
{
- if (check.isMessageSpecific() || (_lastNotificationTimes[check.ordinal()] < thresholdTime))
+ final long currentTime = System.currentTimeMillis();
+ final long thresholdTime = currentTime - _queue.getMinimumAlertRepeatGap();
+
+ for (NotificationCheck check : notificationChecks)
{
- if (check.notifyIfNecessary(msg, _queue, this))
+ if (check.isMessageSpecific() || (_lastNotificationTimes[check.ordinal()] < thresholdTime))
{
- _lastNotificationTimes[check.ordinal()] = currentTime;
+ if (check.notifyIfNecessary(msg, _queue, this))
+ {
+ _lastNotificationTimes[check.ordinal()] = currentTime;
+ }
}
}
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Thu Feb 28 04:16:41 2008
@@ -363,8 +363,9 @@
protocolSession.getProtocolOutputConverter().writeGetOk(entry.getMessage(), channel.getChannelId(),
deliveryTag, _queue.getMessageCount());
- _totalMessageSize.addAndGet(-entry.getSize());
+
}
+ _totalMessageSize.addAndGet(-entry.getSize());
if (!acks)
{
@@ -884,7 +885,7 @@
{
if (!s.isSuspended())
{
- if (_log.isDebugEnabled())
+ if (debugEnabled)
{
_log.debug(debugIdentity() + "Delivering Message:" + entry.getMessage().debugIdentity() + " to(" +
System.identityHashCode(s) + ") :" + s);
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java Thu Feb 28 04:16:41 2008
@@ -51,7 +51,7 @@
ExchangeBinding(AMQShortString routingKey, Exchange exchange, FieldTable arguments)
{
- _routingKey = routingKey;
+ _routingKey = routingKey == null ? AMQShortString.EMPTY_STRING : routingKey;
_exchange = exchange;
_arguments = arguments == null ? EMPTY_ARGUMENTS : arguments;
}
@@ -74,8 +74,7 @@
public int hashCode()
{
return (_exchange == null ? 0 : _exchange.hashCode())
- + (_routingKey == null ? 0 : _routingKey.hashCode())
- + (_arguments == null ? 0 : _arguments.hashCode());
+ + (_routingKey == null ? 0 : _routingKey.hashCode());
}
public boolean equals(Object o)
@@ -86,8 +85,7 @@
}
ExchangeBinding eb = (ExchangeBinding) o;
return _exchange.equals(eb._exchange)
- && _routingKey.equals(eb._routingKey)
- && _arguments.equals(eb._arguments);
+ && _routingKey.equals(eb._routingKey);
}
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java Thu Feb 28 04:16:41 2008
@@ -29,9 +29,9 @@
{
boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
{
- int msgCount = queue.getMessageCount();
+ int msgCount;
final long maximumMessageCount = queue.getMaximumMessageCount();
- if (maximumMessageCount!= 0 && msgCount >= maximumMessageCount)
+ if (maximumMessageCount!= 0 && (msgCount = queue.getMessageCount()) >= maximumMessageCount)
{
listener.notifyClients(this, queue, msgCount + ": Maximum count on queue threshold ("+ maximumMessageCount +") breached.");
return true;
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java Thu Feb 28 04:16:41 2008
@@ -292,14 +292,17 @@
queue.dequeue(storeContext, entry);
}
+/*
+ if (_sendLock.get())
+ {
+ _logger.error("Sending " + entry + " when subscriber(" + this + ") is closed!");
+ }
+*/
+
synchronized (channel)
{
long deliveryTag = channel.getNextDeliveryTag();
- if (_sendLock.get())
- {
- _logger.error("Sending " + entry + " when subscriber(" + this + ") is closed!");
- }
if (_acks)
{
@@ -308,10 +311,11 @@
protocolSession.getProtocolOutputConverter().writeDeliver(entry.getMessage(), channel.getChannelId(), deliveryTag, consumerTag);
- if (!_acks)
- {
- entry.getMessage().decrementReference(storeContext);
- }
+
+ }
+ if (!_acks)
+ {
+ entry.getMessage().decrementReference(storeContext);
}
}
finally
@@ -367,59 +371,60 @@
// return false;
}
- final AMQProtocolSession publisher = entry.getMessage().getPublisher();
+
//todo - client id should be recoreded and this test removed but handled below
- if (_noLocal && publisher != null)
+ if (_noLocal)
{
- // We don't want local messages so check to see if message is one we sent
- Object localInstance;
- Object msgInstance;
- if ((protocolSession.getClientProperties() != null) &&
- (localInstance = protocolSession.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
+ final AMQProtocolSession publisher = entry.getMessage().getPublisher();
+ if(publisher != null)
+
{
+ // We don't want local messages so check to see if message is one we sent
+ Object localInstance;
+ Object msgInstance;
- if ((publisher.getClientProperties() != null) &&
- (msgInstance = publisher.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
+ if ((protocolSession.getClientProperties() != null) &&
+ (localInstance = protocolSession.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
{
- if (localInstance == msgInstance || localInstance.equals(msgInstance))
+
+ if ((publisher.getClientProperties() != null) &&
+ (msgInstance = publisher.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
{
-// if (_logger.isTraceEnabled())
-// {
-// _logger.trace("(" + debugIdentity() + ") has no interest as it is a local message(" +
-// msg.debugIdentity() + ")");
-// }
- return false;
+ if (localInstance == msgInstance || localInstance.equals(msgInstance))
+ {
+ // if (_logger.isTraceEnabled())
+ // {
+ // _logger.trace("(" + debugIdentity() + ") has no interest as it is a local message(" +
+ // msg.debugIdentity() + ")");
+ // }
+ return false;
+ }
}
}
- }
- else
- {
+ else
+ {
- localInstance = protocolSession.getClientIdentifier();
- //todo - client id should be recoreded and this test removed but handled here
+ localInstance = protocolSession.getClientIdentifier();
+ //todo - client id should be recoreded and this test removed but handled here
- msgInstance = publisher.getClientIdentifier();
- if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance)))
- {
-// if (_logger.isTraceEnabled())
-// {
-// _logger.trace("(" + debugIdentity() + ") has no interest as it is a local message(" +
-// msg.debugIdentity() + ")");
-// }
- return false;
+ msgInstance = publisher.getClientIdentifier();
+ if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance)))
+ {
+ // if (_logger.isTraceEnabled())
+ // {
+ // _logger.trace("(" + debugIdentity() + ") has no interest as it is a local message(" +
+ // msg.debugIdentity() + ")");
+ // }
+ return false;
+ }
}
- }
-
+ }
}
- if (_logger.isDebugEnabled())
- {
- _logger.debug("(" + debugIdentity() + ") checking filters for message (" + entry.debugIdentity());
- }
return checkFilters(entry);
}
@@ -433,23 +438,7 @@
private boolean checkFilters(QueueEntry msg)
{
- if (_filters != null)
- {
-// if (_logger.isTraceEnabled())
-// {
-// _logger.trace("(" + debugIdentity() + ") has filters.");
-// }
- return _filters.allAllow(msg.getMessage());
- }
- else
- {
-// if (_logger.isTraceEnabled())
-// {
-// _logger.trace("(" + debugIdentity() + ") has no filters");
-// }
-
- return true;
- }
+ return (_filters == null) || _filters.allAllow(msg.getMessage());
}
public Queue<QueueEntry> getPreDeliveryQueue()
@@ -613,7 +602,7 @@
public boolean wouldSuspend(QueueEntry msg)
{
- return channel.wouldSuspend(msg.getMessage());
+ return _acks && channel.wouldSuspend(msg.getMessage());
}
public Queue<QueueEntry> getResendQueue()
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java Thu Feb 28 04:16:41 2008
@@ -39,6 +39,7 @@
private int _currentSubscriber;
private final Object _changeLock = new Object();
+ private volatile boolean _exclusive;
/** Accessor for unit tests. */
@@ -116,10 +117,7 @@
*/
public Subscription nextSubscriber(QueueEntry msg)
{
- if (_subscriptions.isEmpty())
- {
- return null;
- }
+
try
{
@@ -143,30 +141,64 @@
private Subscription nextSubscriberImpl(QueueEntry msg)
{
- final ListIterator<Subscription> iterator = _subscriptions.listIterator(_currentSubscriber);
- while (iterator.hasNext())
+ if(_exclusive)
{
- Subscription subscription = iterator.next();
- ++_currentSubscriber;
- subscriberScanned();
-
- if (!(subscription.isSuspended() || subscription.wouldSuspend(msg)))
+ try
{
- if (subscription.hasInterest(msg))
+ Subscription subscription = _subscriptions.get(0);
+ subscriberScanned();
+
+ if (!(subscription.isSuspended() || subscription.wouldSuspend(msg)))
{
- // if the queue is not empty then this client is ready to receive a message.
- //FIXME the queue could be full of sent messages.
- // Either need to clean all PDQs after sending a message
- // OR have a clean up thread that runs the PDQs expunging the messages.
- if (!subscription.filtersMessages() || subscription.getPreDeliveryQueue().isEmpty())
+ if (subscription.hasInterest(msg))
{
- return subscription;
+ // if the queue is not empty then this client is ready to receive a message.
+ //FIXME the queue could be full of sent messages.
+ // Either need to clean all PDQs after sending a message
+ // OR have a clean up thread that runs the PDQs expunging the messages.
+ if (!subscription.filtersMessages() || subscription.getPreDeliveryQueue().isEmpty())
+ {
+ return subscription;
+ }
}
}
}
+ catch(IndexOutOfBoundsException e)
+ {
+ }
+ return null;
}
+ else
+ {
+ if (_subscriptions.isEmpty())
+ {
+ return null;
+ }
+ final ListIterator<Subscription> iterator = _subscriptions.listIterator(_currentSubscriber);
+ while (iterator.hasNext())
+ {
+ Subscription subscription = iterator.next();
+ ++_currentSubscriber;
+ subscriberScanned();
- return null;
+ if (!(subscription.isSuspended() || subscription.wouldSuspend(msg)))
+ {
+ if (subscription.hasInterest(msg))
+ {
+ // if the queue is not empty then this client is ready to receive a message.
+ //FIXME the queue could be full of sent messages.
+ // Either need to clean all PDQs after sending a message
+ // OR have a clean up thread that runs the PDQs expunging the messages.
+ if (!subscription.filtersMessages() || subscription.getPreDeliveryQueue().isEmpty())
+ {
+ return subscription;
+ }
+ }
+ }
+ }
+
+ return null;
+ }
}
/** Overridden in test classes. */
@@ -233,5 +265,14 @@
{
return _changeLock;
}
-
+
+ public void setExclusive(final boolean exclusive)
+ {
+ _exclusive = exclusive;
+ }
+
+ public boolean getExcBoolean()
+ {
+ return _exclusive;
+ }
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java Thu Feb 28 04:16:41 2008
@@ -22,6 +22,8 @@
import java.util.LinkedList;
import java.util.List;
+import java.util.ArrayList;
+import java.util.Collections;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
@@ -60,7 +62,7 @@
* delivered. It is <b>cleared after delivery has been attempted</b>. Any persistent record of destinations is done
* by the message handle.
*/
- private List<AMQQueue> _destinationQueues = new LinkedList<AMQQueue>();
+ private List<AMQQueue> _destinationQueues;
public MessagePublishInfo getMessagePublishInfo()
{
@@ -74,7 +76,7 @@
public List<AMQQueue> getDestinationQueues()
{
- return _destinationQueues;
+ return _destinationQueues == null ? (List<AMQQueue>) Collections.EMPTY_LIST : _destinationQueues;
}
public void setDestinationQueues(List<AMQQueue> destinationQueues)
@@ -109,6 +111,10 @@
public void addDestinationQueue(AMQQueue queue)
{
+ if(_destinationQueues == null)
+ {
+ _destinationQueues = new ArrayList<AMQQueue>();
+ }
_destinationQueues.add(queue);
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java Thu Feb 28 04:16:41 2008
@@ -49,11 +49,11 @@
/** Where to put undeliverable messages */
private final List<RequiredDeliveryException> _returnMessages;
- private Set<Long> _browsedAcks;
+ private final Set<Long> _browsedAcks;
private final MessageStore _messageStore;
- private StoreContext _storeContext;
+ private final StoreContext _storeContext;
/** Whether we are in a transaction */
private boolean _inTran;
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java Thu Feb 28 04:16:41 2008
@@ -24,7 +24,6 @@
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.StoreContext;
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java Thu Feb 28 04:16:41 2008
@@ -23,6 +23,7 @@
import javax.management.NotCompliantMBeanException;
import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.log4j.Logger;
import org.apache.qpid.server.AMQBrokerManagerMBean;
import org.apache.qpid.server.security.access.AccessManager;
@@ -126,7 +127,7 @@
*/
public VirtualHost(String name, MessageStore store) throws Exception
{
- this(name, null, store);
+ this(name, new PropertiesConfiguration(), store);
}
/**
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java Thu Feb 28 04:16:41 2008
@@ -178,7 +178,7 @@
public void testQueueDepthAlertWithSubscribers() throws Exception
{
protocolSession = new TestMinaProtocolSession();
- AMQChannel channel = new AMQChannel(protocolSession, 2,_txm, _messageStore, null);
+ AMQChannel channel = new AMQChannel(protocolSession, 2,_txm, _messageStore);
protocolSession.addChannel(channel);
// Create queue
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java Thu Feb 28 04:16:41 2008
@@ -129,7 +129,7 @@
TestMinaProtocolSession protocolSession = new TestMinaProtocolSession();
- AMQChannel channel = new AMQChannel(protocolSession, 1,_txm, _messageStore, null);
+ AMQChannel channel = new AMQChannel(protocolSession, 1,_txm, _messageStore);
protocolSession.addChannel(channel);
_queue.registerProtocolSession(protocolSession, 1, new AMQShortString("test"), false, null,false,false);
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client-java14/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client-java14/pom.xml?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client-java14/pom.xml (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client-java14/pom.xml Thu Feb 28 04:16:41 2008
@@ -1,224 +1,224 @@
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
- <groupId>org.apache.qpid</groupId>
- <artifactId>qpid-client-java14</artifactId>
- <packaging>jar</packaging>
- <version>1.0-incubating-M3-SNAPSHOT</version>
- <name>Qpid Client for Java 1.4</name>
- <url>http://cwiki.apache.org/confluence/display/qpid</url>
-
- <parent>
- <groupId>org.apache.qpid</groupId>
- <artifactId>qpid</artifactId>
- <version>1.0-incubating-M3-SNAPSHOT</version>
- <relativePath>../pom.xml</relativePath>
- </parent>
-
- <properties>
- <topDirectoryLocation>..</topDirectoryLocation>
- <java.source.version>1.4</java.source.version>
- <qpid.version>${pom.version}</qpid.version>
- <qpid.targetDir>${project.build.directory}</qpid.targetDir>
- <!--<qpid.root>${basedir}/..</qpid.root>-->
- <sasl.properties>${basedir}/etc/sasl.properties</sasl.properties>
-
- <!-- This is a dummy value to ensure this property exists, override in your settings.xml to your real 1.4 jdk location to run tests. -->
- <jvm.1.4.bin>path/to/java1.4</jvm.1.4.bin>
- </properties>
-
- <dependencies>
-
- <!-- These dependencies have to be re-declared here, because exluding the normal (non 1.4) client and common from the distribution takes out
- these transitive dependencies too. -->
- <dependency>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-simple</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.mina</groupId>
- <artifactId>mina-core</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.geronimo.specs</groupId>
- <artifactId>geronimo-jms_1.1_spec</artifactId>
- </dependency>
-
- <dependency>
- <groupId>commons-collections</groupId>
- <artifactId>commons-collections</artifactId>
- </dependency>
-
- <dependency>
- <groupId>commons-lang</groupId>
- <artifactId>commons-lang</artifactId>
- </dependency>
-
- <!-- Use the java 1.4 retrotranslated client. -->
- <dependency>
- <groupId>org.apache.qpid</groupId>
- <artifactId>qpid-client</artifactId>
- <type>jar</type>
- <version>${pom.version}</version>
- <classifier>java14</classifier>
- </dependency>
-
- <!-- Use the java 1.4 retrotranslated common library. -->
- <dependency>
- <groupId>org.apache.qpid</groupId>
- <artifactId>qpid-common</artifactId>
- <type>jar</type>
- <version>${pom.version}</version>
- <classifier>java14</classifier>
- </dependency>
-
- <!-- Use the java 1.4 retrotranslated integration tests. -->
- <dependency>
- <groupId>org.apache.qpid</groupId>
- <artifactId>qpid-integrationtests</artifactId>
- <type>jar</type>
- <version>${pom.version}</version>
- <classifier>java14</classifier>
- <!--<scope>test</scope>-->
- </dependency>
-
- <dependency>
- <groupId>net.sf.retrotranslator</groupId>
- <artifactId>retrotranslator-runtime</artifactId>
- <scope>package</scope>
- </dependency>
-
- <!-- Test dependencies. -->
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
-
- </dependencies>
-
- <build>
- <plugins>
-
- <!-- Sets up the compiler plugin to compile on 1.4 -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <configuration>
- <source>${java.source.version}</source>
- <target>${java.source.version}</target>
- </configuration>
- </plugin>
-
- <!-- Sets up the assembly plugin to use the assembly directions to build a 1.4 compatable client. -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-assembly-plugin</artifactId>
- <version>${assembly.version}</version>
-
- <executions>
-
- <!-- Produces the distribution. -->
- <execution>
- <id>assembly-dist</id>
- <phase>package</phase>
- <goals>
- <goal>attached</goal>
- </goals>
- <configuration>
- <descriptors>
- <descriptor>src/main/assembly/client-java14-bin.xml</descriptor>
- </descriptors>
- <finalName>qpid-${pom.version}</finalName>
- <outputDirectory>${qpid.targetDir}</outputDirectory>
- <tarLongFileMode>gnu</tarLongFileMode>
- </configuration>
- </execution>
-
- <!-- Produces a jar with all test dependencies in it. For convenience in running tests from command line. -->
- <!-- Todo: Replace this with a manifest only jar, its much quicker to build that. -->
- <execution>
- <id>assembly-alltestdeps</id>
- <phase>package</phase>
- <goals>
- <goal>attached</goal>
- </goals>
- <configuration>
- <descriptors>
- <descriptor>src/main/assembly/jar-with-dependencies.xml</descriptor>
- </descriptors>
- <outputDirectory>target</outputDirectory>
- <workDirectory>target/assembly/work</workDirectory>
- </configuration>
- </execution>
-
- </executions>
- </plugin>
-
- <!-- Sets up surefire to run during the integration-test phase instead of the test phase. -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <configuration>
- <skip>true</skip>
- </configuration>
- <executions>
- <execution>
- <id>surefire-it</id>
- <phase>integration-test</phase>
- <goals>
- <goal>test</goal>
- </goals>
- <configuration>
- <skip>true</skip>
- <forkMode>once</forkMode>
- <jvm>${jvm.1.4.bin}</jvm>
- <systemProperties>
- <property>
- <name>amqj.logging.level</name>
- <value>${amqj.logging.level}</value>
- </property>
- <property>
- <name>log4j.configuration</name>
- <value>${log4j.configuration}</value>
- </property>
- <property>
- <name>amq.dynamicsaslregistrar.properties</name>
- <value>${sasl.properties}</value>
- </property>
- </systemProperties>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- </plugins>
- </build>
-</project>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-client-java14</artifactId>
+ <packaging>jar</packaging>
+ <version>1.0-incubating-M2.1-SNAPSHOT</version>
+ <name>Qpid Client for Java 1.4</name>
+ <url>http://cwiki.apache.org/confluence/display/qpid</url>
+
+ <parent>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid</artifactId>
+ <version>1.0-incubating-M2.1-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <properties>
+ <topDirectoryLocation>..</topDirectoryLocation>
+ <java.source.version>1.4</java.source.version>
+ <qpid.version>${pom.version}</qpid.version>
+ <qpid.targetDir>${project.build.directory}</qpid.targetDir>
+ <!--<qpid.root>${basedir}/..</qpid.root>-->
+ <sasl.properties>${basedir}/etc/sasl.properties</sasl.properties>
+
+ <!-- This is a dummy value to ensure this property exists, override in your settings.xml to your real 1.4 jdk location to run tests. -->
+ <jvm.1.4.bin>path/to/java1.4</jvm.1.4.bin>
+ </properties>
+
+ <dependencies>
+
+ <!-- These dependencies have to be re-declared here, because exluding the normal (non 1.4) client and common from the distribution takes out
+ these transitive dependencies too. -->
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.mina</groupId>
+ <artifactId>mina-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.geronimo.specs</groupId>
+ <artifactId>geronimo-jms_1.1_spec</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>commons-collections</groupId>
+ <artifactId>commons-collections</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ </dependency>
+
+ <!-- Use the java 1.4 retrotranslated client. -->
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-client</artifactId>
+ <type>jar</type>
+ <version>${pom.version}</version>
+ <classifier>java14</classifier>
+ </dependency>
+
+ <!-- Use the java 1.4 retrotranslated common library. -->
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-common</artifactId>
+ <type>jar</type>
+ <version>${pom.version}</version>
+ <classifier>java14</classifier>
+ </dependency>
+
+ <!-- Use the java 1.4 retrotranslated integration tests. -->
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-integrationtests</artifactId>
+ <type>jar</type>
+ <version>${pom.version}</version>
+ <classifier>java14</classifier>
+ <!--<scope>test</scope>-->
+ </dependency>
+
+ <dependency>
+ <groupId>net.sf.retrotranslator</groupId>
+ <artifactId>retrotranslator-runtime</artifactId>
+ <scope>package</scope>
+ </dependency>
+
+ <!-- Test dependencies. -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+
+ <!-- Sets up the compiler plugin to compile on 1.4 -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>${java.source.version}</source>
+ <target>${java.source.version}</target>
+ </configuration>
+ </plugin>
+
+ <!-- Sets up the assembly plugin to use the assembly directions to build a 1.4 compatable client. -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>${assembly.version}</version>
+
+ <executions>
+
+ <!-- Produces the distribution. -->
+ <execution>
+ <id>assembly-dist</id>
+ <phase>package</phase>
+ <goals>
+ <goal>attached</goal>
+ </goals>
+ <configuration>
+ <descriptors>
+ <descriptor>src/main/assembly/client-java14-bin.xml</descriptor>
+ </descriptors>
+ <finalName>qpid-${pom.version}</finalName>
+ <outputDirectory>${qpid.targetDir}</outputDirectory>
+ <tarLongFileMode>gnu</tarLongFileMode>
+ </configuration>
+ </execution>
+
+ <!-- Produces a jar with all test dependencies in it. For convenience in running tests from command line. -->
+ <!-- Todo: Replace this with a manifest only jar, its much quicker to build that. -->
+ <execution>
+ <id>assembly-alltestdeps</id>
+ <phase>package</phase>
+ <goals>
+ <goal>attached</goal>
+ </goals>
+ <configuration>
+ <descriptors>
+ <descriptor>src/main/assembly/jar-with-dependencies.xml</descriptor>
+ </descriptors>
+ <outputDirectory>target</outputDirectory>
+ <workDirectory>target/assembly/work</workDirectory>
+ </configuration>
+ </execution>
+
+ </executions>
+ </plugin>
+
+ <!-- Sets up surefire to run during the integration-test phase instead of the test phase. -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ <executions>
+ <execution>
+ <id>surefire-it</id>
+ <phase>integration-test</phase>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ <configuration>
+ <skip>true</skip>
+ <forkMode>once</forkMode>
+ <jvm>${jvm.1.4.bin}</jvm>
+ <systemProperties>
+ <property>
+ <name>amqj.logging.level</name>
+ <value>${amqj.logging.level}</value>
+ </property>
+ <property>
+ <name>log4j.configuration</name>
+ <value>${log4j.configuration}</value>
+ </property>
+ <property>
+ <name>amq.dynamicsaslregistrar.properties</name>
+ <value>${sasl.properties}</value>
+ </property>
+ </systemProperties>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ </plugins>
+ </build>
+</project>
\ No newline at end of file
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/pom.xml?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/pom.xml (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/pom.xml Thu Feb 28 04:16:41 2008
@@ -79,7 +79,7 @@
</dependency>
<dependency>
- <groupId>uk.co.thebadgerset</groupId>
+ <groupId>org.apache.qpid</groupId>
<artifactId>junit-toolkit</artifactId>
<scope>test</scope>
</dependency>
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Thu Feb 28 04:16:41 2008
@@ -73,6 +73,105 @@
public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable
{
+ public static final class ChannelToSessionMap
+ {
+ private final AMQSession[] _fastAccessSessions = new AMQSession[16];
+ private final LinkedHashMap<Integer, AMQSession> _slowAccessSessions = new LinkedHashMap<Integer, AMQSession>();
+ private int _size = 0;
+ private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
+
+ public AMQSession get(int channelId)
+ {
+ if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
+ {
+ return _fastAccessSessions[channelId];
+ }
+ else
+ {
+ return _slowAccessSessions.get(channelId);
+ }
+ }
+
+ public AMQSession put(int channelId, AMQSession session)
+ {
+ AMQSession oldVal;
+ if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
+ {
+ oldVal = _fastAccessSessions[channelId];
+ _fastAccessSessions[channelId] = session;
+ }
+ else
+ {
+ oldVal = _slowAccessSessions.put(channelId, session);
+ }
+ if((oldVal != null) && (session == null))
+ {
+ _size--;
+ }
+ else if((oldVal == null) && (session != null))
+ {
+ _size++;
+ }
+
+ return session;
+
+ }
+
+
+ public AMQSession remove(int channelId)
+ {
+ AMQSession session;
+ if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
+ {
+ session = _fastAccessSessions[channelId];
+ _fastAccessSessions[channelId] = null;
+ }
+ else
+ {
+ session = _slowAccessSessions.remove(channelId);
+ }
+
+ if(session != null)
+ {
+ _size--;
+ }
+ return session;
+
+ }
+
+ public Collection<AMQSession> values()
+ {
+ ArrayList<AMQSession> values = new ArrayList<AMQSession>(size());
+
+ for(int i = 0; i < 16; i++)
+ {
+ if(_fastAccessSessions[i] != null)
+ {
+ values.add(_fastAccessSessions[i]);
+ }
+ }
+ values.addAll(_slowAccessSessions.values());
+
+ return values;
+ }
+
+ public int size()
+ {
+ return _size;
+ }
+
+ public void clear()
+ {
+ _size = 0;
+ _slowAccessSessions.clear();
+ for(int i = 0; i<16; i++)
+ {
+ _fastAccessSessions[i] = null;
+ }
+ }
+ }
+
+
private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class);
protected AtomicInteger _idFactory = new AtomicInteger(0);
@@ -102,7 +201,7 @@
protected AMQProtocolHandler _protocolHandler;
/** Maps from session id (Integer) to AMQSession instance */
- private final Map<Integer, AMQSession> _sessions = new LinkedHashMap<Integer, AMQSession>();
+ private final ChannelToSessionMap _sessions = new ChannelToSessionMap();
private String _clientName;
@@ -266,6 +365,26 @@
_delegate = new AMQConnectionDelegate_0_8(this);
}
+ final ArrayList<JMSException> exceptions = new ArrayList<JMSException>();
+
+ class Listener implements ExceptionListener
+ {
+ public void onException(JMSException e)
+ {
+ exceptions.add(e);
+ }
+ }
+
+ try
+ {
+ setExceptionListener(new Listener());
+ }
+ catch (JMSException e)
+ {
+ // Shouldn't happen
+ throw new AMQException(null, null, e);
+ }
+
if (_logger.isInfoEnabled())
{
_logger.info("Connection:" + connectionURL);
@@ -322,8 +441,6 @@
try
{
makeBrokerConnection(_failoverPolicy.getNextBrokerDetails());
- lastException = null;
- _connected = true;
}
catch (Exception e)
{
@@ -351,7 +468,23 @@
{
String message = null;
- if (lastException != null)
+ if (exceptions.size() > 0)
+ {
+ JMSException e = exceptions.get(exceptions.size() - 1);
+ int code = -1;
+ try
+ {
+ code = new Integer(e.getErrorCode()).intValue();
+ }
+ catch (NumberFormatException nfe)
+ {
+ // Ignore this, we have some error codes and messages swapped around
+ }
+
+ throw new AMQConnectionFailureException(AMQConstant.getConstant(code),
+ e.getMessage(), e);
+ }
+ else if (lastException != null)
{
if (lastException.getCause() != null)
{
@@ -671,10 +804,10 @@
checkNotClosed();
if (!_started)
{
- final Iterator it = _sessions.entrySet().iterator();
+ final Iterator it = _sessions.values().iterator();
while (it.hasNext())
{
- final AMQSession s = (AMQSession) ((Map.Entry) it.next()).getValue();
+ final AMQSession s = (AMQSession) (it.next());
try
{
s.start();
@@ -927,11 +1060,11 @@
return _maximumFrameSize;
}
- public Map getSessions()
+ public ChannelToSessionMap getSessions()
{
return _sessions;
}
-
+
public String getUsername()
{
return _username;
@@ -1149,6 +1282,24 @@
void deregisterSession(int channelId)
{
_sessions.remove(channelId);
+ }
+
+ /**
+ * For all sessions, and for all consumers in those sessions, resubscribe. This is called during failover handling.
+ * The caller must hold the failover mutex before calling this method.
+ */
+ public void resubscribeSesssions() throws JMSException, AMQException, FailoverException
+ {
+ ArrayList sessions = new ArrayList(_sessions.values());
+ _logger.info(MessageFormat.format("Resubscribing sessions = {0} sessions.size={1}", sessions, sessions.size())); // FIXME: removeKey?
+ for (Iterator it = sessions.iterator(); it.hasNext();)
+ {
+ AMQSession s = (AMQSession) it.next();
+ // _protocolHandler.addSessionByChannel(s.getChannelId(), s);
+ reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.getTransacted());
+ s.resubscribe();
+ s.setFlowControl(true);
+ }
}
public String toString()
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java Thu Feb 28 04:16:41 2008
@@ -88,6 +88,7 @@
checkState();
final BasicMessageConsumer consumer =
(BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false);
+ consumer.closeWhenNoMessages(true);
_consumers.add(consumer);
return new Enumeration()
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Thu Feb 28 04:16:41 2008
@@ -23,6 +23,7 @@
import java.io.Serializable;
import java.text.MessageFormat;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -109,6 +110,89 @@
*/
public abstract class AMQSession extends Closeable implements Session, QueueSession, TopicSession
{
+ public static final class IdToConsumerMap
+ {
+ private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16];
+ private final ConcurrentHashMap<Integer, BasicMessageConsumer> _slowAccessConsumers = new ConcurrentHashMap<Integer, BasicMessageConsumer>();
+
+
+ public BasicMessageConsumer get(int id)
+ {
+ if((id & 0xFFFFFFF0) == 0)
+ {
+ return _fastAccessConsumers[id];
+ }
+ else
+ {
+ return _slowAccessConsumers.get(id);
+ }
+ }
+
+ public BasicMessageConsumer put(int id, BasicMessageConsumer consumer)
+ {
+ BasicMessageConsumer oldVal;
+ if((id & 0xFFFFFFF0) == 0)
+ {
+ oldVal = _fastAccessConsumers[id];
+ _fastAccessConsumers[id] = consumer;
+ }
+ else
+ {
+ oldVal = _slowAccessConsumers.put(id, consumer);
+ }
+
+ return consumer;
+
+ }
+
+
+ public BasicMessageConsumer remove(int id)
+ {
+ BasicMessageConsumer consumer;
+ if((id & 0xFFFFFFF0) == 0)
+ {
+ consumer = _fastAccessConsumers[id];
+ _fastAccessConsumers[id] = null;
+ }
+ else
+ {
+ consumer = _slowAccessConsumers.remove(id);
+ }
+
+ return consumer;
+
+ }
+
+ public Collection<BasicMessageConsumer> values()
+ {
+ ArrayList<BasicMessageConsumer> values = new ArrayList<BasicMessageConsumer>();
+
+ for(int i = 0; i < 16; i++)
+ {
+ if(_fastAccessConsumers[i] != null)
+ {
+ values.add(_fastAccessConsumers[i]);
+ }
+ }
+ values.addAll(_slowAccessConsumers.values());
+
+ return values;
+ }
+
+
+ public void clear()
+ {
+ _slowAccessConsumers.clear();
+ for(int i = 0; i<16; i++)
+ {
+ _fastAccessConsumers[i] = null;
+ }
+ }
+ }
+
+
+
+
/** Used for debugging. */
private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
@@ -158,7 +242,7 @@
protected boolean _transacted;
/** Holds the sessions acknowledgement mode. */
- protected int _acknowledgeMode;
+ protected final int _acknowledgeMode;
/** Holds this session unique identifier, used to distinguish it from other sessions. */
protected int _channelId;
@@ -224,8 +308,10 @@
* Maps from identifying tags to message consumers, in order to pass dispatch incoming messages to the right
* consumer.
*/
- protected Map<AMQShortString, BasicMessageConsumer> _consumers =
- new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>();
+ protected final IdToConsumerMap _consumers = new IdToConsumerMap();
+
+ //Map<AMQShortString, BasicMessageConsumer> _consumers =
+ //new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>();
/**
* Contains a list of consumers which have been removed but which might still have
@@ -288,6 +374,27 @@
/** Has failover occured on this session */
private boolean _failedOver;
+
+
+ private static final class FlowControlIndicator
+ {
+ private volatile boolean _flowControl = true;
+
+ public synchronized void setFlowControl(boolean flowControl)
+ {
+ _flowControl= flowControl;
+ notify();
+ }
+
+ public boolean getFlowControl()
+ {
+ return _flowControl;
+ }
+ }
+
+ /** Flow control */
+ private FlowControlIndicator _flowControl = new FlowControlIndicator();
+
/**
* Creates a new session on a connection.
*
@@ -334,24 +441,20 @@
{
public void aboveThreshold(int currentValue)
{
- if (_acknowledgeMode == NO_ACKNOWLEDGE)
- {
_logger.debug(
"Above threshold(" + _defaultPrefetchHighMark
+ ") so suspending channel. Current value is " + currentValue);
new Thread(new SuspenderRunner(true)).start();
- }
+
}
public void underThreshold(int currentValue)
{
- if (_acknowledgeMode == NO_ACKNOWLEDGE)
- {
_logger.debug(
"Below threshold(" + _defaultPrefetchLowMark
+ ") so unsuspending channel. Current value is " + currentValue);
new Thread(new SuspenderRunner(false)).start();
- }
+
}
});
}
@@ -697,7 +800,7 @@
{
// Remove the consumer from the map
- BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(consumerTag);
+ BasicMessageConsumer consumer = _consumers.get(consumerTag.toIntValue());
if (consumer != null)
{
// fixme this isn't right.. needs to check if _queue contains data for this consumer
@@ -787,6 +890,7 @@
false, false);
}
+
public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
{
checkValidDestination(destination);
@@ -804,6 +908,17 @@
messageSelector, null, false, false);
}
+
+ public MessageConsumer createExclusiveConsumer(Destination destination, String messageSelector, boolean noLocal)
+ throws JMSException
+ {
+ checkValidDestination(destination);
+
+ return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, true,
+ messageSelector, null, false, false);
+ }
+
+
public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive,
String selector) throws JMSException
{
@@ -905,7 +1020,7 @@
{
checkNotClosed();
- return new TopicPublisherAdapter((BasicMessageProducer) createProducer(topic), topic);
+ return new TopicPublisherAdapter((BasicMessageProducer) createProducer(topic,false,false), topic);
}
public Queue createQueue(String queueName) throws JMSException
@@ -1072,9 +1187,10 @@
AMQTopic dest = checkValidTopic(topic);
// AMQTopic dest = new AMQTopic(topic.getTopicName());
- return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
+ return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createExclusiveConsumer(dest));
}
+
/**
* Creates a non-durable subscriber with a message selector
*
@@ -1092,7 +1208,7 @@
AMQTopic dest = checkValidTopic(topic);
// AMQTopic dest = new AMQTopic(topic.getTopicName());
- return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal));
+ return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createExclusiveConsumer(dest, messageSelector, noLocal));
}
public abstract TemporaryQueue createTemporaryQueue() throws JMSException;
@@ -1648,7 +1764,7 @@
*/
void deregisterConsumer(BasicMessageConsumer consumer)
{
- if (_consumers.remove(consumer.getConsumerTag()) != null)
+ if (_consumers.remove(consumer.getConsumerTag().toIntValue()) != null)
{
String subscriptionName = _reverseSubscriptionMap.remove(consumer);
if (subscriptionName != null)
@@ -2047,8 +2163,9 @@
private void consumeFromQueue(BasicMessageConsumer consumer, AMQShortString queueName,
AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector) throws AMQException, FailoverException
{
+ int tagId = _nextTag++;
// need to generate a consumer tag on the client so we can exploit the nowait flag
- AMQShortString tag = new AMQShortString(Integer.toString(_nextTag++));
+ AMQShortString tag = new AMQShortString(Integer.toString(tagId));
FieldTable arguments = FieldTableFactory.newFieldTable();
if ((messageSelector != null) && !messageSelector.equals(""))
@@ -2068,7 +2185,7 @@
consumer.setConsumerTag(tag);
// we must register the consumer in the map before we actually start listening
- _consumers.put(tag, consumer);
+ _consumers.put(tagId, consumer);
try
{
@@ -2096,7 +2213,7 @@
catch (AMQException e)
{
// clean-up the map in the event of an error
- _consumers.remove(tag);
+ _consumers.remove(tagId);
throw e;
}
}
@@ -2670,6 +2787,25 @@
_ticket = ticket;
}
+ public void setFlowControl(final boolean active)
+ {
+ _flowControl.setFlowControl(active);
+ }
+
+
+ public void checkFlowControl() throws InterruptedException
+ {
+ synchronized(_flowControl)
+ {
+ while(!_flowControl.getFlowControl())
+ {
+ _flowControl.wait();
+ }
+ }
+
+ }
+
+
/** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
private class Dispatcher extends Thread
{
@@ -2874,38 +3010,38 @@
//This if block is not needed anymore as bounce messages are handled separately
//if (message.getDeliverBody() != null)
//{
- final BasicMessageConsumer consumer =
- (BasicMessageConsumer) _consumers.get(new AMQShortString(message.getConsumerTag()));
+ final BasicMessageConsumer consumer =
+ _consumers.get(message.getConsumerTag().toIntValue());
- if ((consumer == null) || consumer.isClosed())
+ if ((consumer == null) || consumer.isClosed())
+ {
+ if (_dispatcherLogger.isInfoEnabled())
{
- if (_dispatcherLogger.isInfoEnabled())
+ if (consumer == null)
{
- if (consumer == null)
- {
- _dispatcherLogger.info("Dispatcher(" + dispatcherID + ")Received a message(" + System.identityHashCode(message) + ")" + "["
+ _dispatcherLogger.info("Dispatcher(" + dispatcherID + ")Received a message(" + System.identityHashCode(message) + ")" + "["
+ message.getDeliveryTag() + "] from queue "
+ message.getConsumerTag() + " )without a handler - rejecting(requeue)...");
- }
- else
- {
- _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "["
- + message.getDeliveryTag() + "] from queue " + " consumer("
- + consumer.debugIdentity() + ") is closed rejecting(requeue)...");
- }
}
- // Don't reject if we're already closing
- if (!_closed.get())
+ else
{
- rejectMessage(message, true);
+ _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "["
+ + message.getDeliveryTag() + "] from queue " + " consumer("
+ + message.getConsumerTag() + ") is closed rejecting(requeue)...");
}
}
- else
+ // Don't reject if we're already closing
+ if (!_closed.get())
{
- consumer.notifyMessage(message, _channelId);
+ rejectMessage(message, true);
}
}
- //}
+ else
+ {
+ consumer.notifyMessage(message);
+ }
+
+ }
}
/*public void requestAccess(AMQShortString realm, boolean exclusive, boolean passive, boolean active, boolean write,
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Thu Feb 28 04:16:41 2008
@@ -222,7 +222,7 @@
consumer.setConsumerTag(tag);
// we must register the consumer in the map before we actually start listening
- _consumers.put(tag, consumer);
+ _consumers.put(tag.toIntValue(), consumer);
// TODO: Be aware of possible changes to parameter order as versions change.
AMQFrame jmsConsume = getProtocolHandler().getMethodRegistry().createBasicConsumeBody(getTicket(),
queueName,