You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2012/03/10 20:22:37 UTC
svn commit: r1299257 [7/26] - in
/qpid/branches/rg-amqp-1-0-sandbox/qpid/java: broker-plugins/
broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/config/
broker-plugins/access-control/src/main/java/org/apache/qpid/server...
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java Sat Mar 10 19:22:10 2012
@@ -20,12 +20,20 @@
*/
package org.apache.qpid.server.protocol;
+import java.util.concurrent.ConcurrentSkipListSet;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.SimpleAMQQueue;
-public interface AMQSessionModel
+/**
+ * Session model interface.
+ * Extends {@link Comparable} to allow objects to be inserted into a {@link ConcurrentSkipListSet}
+ * when monitoring the blocking and blocking of queues/sessions in {@link SimpleAMQQueue}.
+ */
+public interface AMQSessionModel extends Comparable<AMQSessionModel>
{
public Object getID();
@@ -57,4 +65,7 @@ public interface AMQSessionModel
void block(AMQQueue queue);
void unblock(AMQQueue queue);
+
+
+ boolean onSameConnection(InboundMessage inbound);
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java Sat Mar 10 19:22:10 2012
@@ -21,6 +21,11 @@
package org.apache.qpid.server.protocol;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.Set;
+
import org.apache.log4j.Logger;
import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.registry.IApplicationRegistry;
@@ -29,10 +34,6 @@ import org.apache.qpid.transport.Connect
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.NetworkConnection;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.util.Set;
-
public class MultiVersionProtocolEngine implements ServerProtocolEngine
{
private static final Logger _logger = Logger.getLogger(MultiVersionProtocolEngine.class);
@@ -44,29 +45,35 @@ public class MultiVersionProtocolEngine
private IApplicationRegistry _appRegistry;
private NetworkConnection _network;
private Sender<ByteBuffer> _sender;
+ private final AmqpProtocolVersion _defaultSupportedReply;
private volatile ServerProtocolEngine _delegate = new SelfDelegateProtocolEngine();
- public MultiVersionProtocolEngine(IApplicationRegistry appRegistry,
- String fqdn,
- Set<AmqpProtocolVersion> supported,
- NetworkConnection network,
- long id)
+ public MultiVersionProtocolEngine(final IApplicationRegistry appRegistry,
+ final Set<AmqpProtocolVersion> supported,
+ final AmqpProtocolVersion defaultSupportedReply,
+ final long id,
+ final NetworkConnection network)
{
- this(appRegistry,fqdn,supported,id);
+ this(appRegistry, supported, defaultSupportedReply, id);
setNetworkConnection(network);
}
- public MultiVersionProtocolEngine(IApplicationRegistry appRegistry,
- String fqdn,
- Set<AmqpProtocolVersion> supported,
- long id)
+ public MultiVersionProtocolEngine(final IApplicationRegistry appRegistry,
+ final Set<AmqpProtocolVersion> supported,
+ final AmqpProtocolVersion defaultSupportedReply,
+ final long id)
{
+ if(defaultSupportedReply != null && !supported.contains(defaultSupportedReply))
+ {
+ throw new IllegalArgumentException("The configured default reply (" + defaultSupportedReply
+ + ") to an unsupported protocol version initiation is itself not supported!");
+ }
+
_id = id;
_appRegistry = appRegistry;
- _fqdn = fqdn;
_supported = supported;
-
+ _defaultSupportedReply = defaultSupportedReply;
}
@@ -198,6 +205,15 @@ public class MultiVersionProtocolEngine
public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
{
_network = network;
+ SocketAddress address = _network.getLocalAddress();
+ if (address instanceof InetSocketAddress)
+ {
+ _fqdn = ((InetSocketAddress) address).getHostName();
+ }
+ else
+ {
+ throw new IllegalArgumentException("Unsupported socket address class: " + address);
+ }
_sender = sender;
}
@@ -445,14 +461,18 @@ public class MultiVersionProtocolEngine
ServerProtocolEngine newDelegate = null;
- byte[] newestSupported = null;
+ byte[] supportedReplyBytes = null;
+ byte[] defaultSupportedReplyBytes = null;
+ AmqpProtocolVersion supportedReplyVersion = null;
+ //Check the supported versions for a header match, and if there is one save the
+ //delegate. Also save most recent supported version and associated reply header bytes
for(int i = 0; newDelegate == null && i < _creators.length; i++)
{
-
if(_supported.contains(_creators[i].getVersion()))
{
- newestSupported = _creators[i].getHeaderIdentifier();
+ supportedReplyBytes = _creators[i].getHeaderIdentifier();
+ supportedReplyVersion = _creators[i].getVersion();
byte[] compareBytes = _creators[i].getHeaderIdentifier();
boolean equal = true;
for(int j = 0; equal && j<compareBytes.length; j++)
@@ -464,12 +484,35 @@ public class MultiVersionProtocolEngine
newDelegate = _creators[i].getProtocolEngine();
}
}
+
+ //If there is a configured default reply to an unsupported version initiation,
+ //then save the associated reply header bytes when we encounter them
+ if(_defaultSupportedReply != null && _creators[i].getVersion() == _defaultSupportedReply)
+ {
+ defaultSupportedReplyBytes = _creators[i].getHeaderIdentifier();
+ }
}
- // If no delegate is found then send back the most recent support protocol version id
+ // If no delegate is found then send back a supported protocol version id
if(newDelegate == null)
{
- _sender.send(ByteBuffer.wrap(newestSupported));
+ //if a default reply was specified use its reply header instead of the most recent supported version
+ if(_defaultSupportedReply != null && !(_defaultSupportedReply == supportedReplyVersion))
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Default reply to unsupported protocol version was configured, changing reply from "
+ + supportedReplyVersion + " to " + _defaultSupportedReply);
+ }
+
+ supportedReplyBytes = defaultSupportedReplyBytes;
+ supportedReplyVersion = _defaultSupportedReply;
+ }
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Unsupported protocol version requested, replying with: " + supportedReplyVersion);
+ }
+ _sender.send(ByteBuffer.wrap(supportedReplyBytes));
_sender.flush();
_delegate = new ClosedDelegateProtocolEngine();
@@ -482,7 +525,6 @@ public class MultiVersionProtocolEngine
_delegate = newDelegate;
_header.flip();
- _delegate.setNetworkConnection(_network, _sender);
_delegate.received(_header);
if(msg.hasRemaining())
{
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java Sat Mar 10 19:22:10 2012
@@ -20,38 +20,38 @@
*/
package org.apache.qpid.server.protocol;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.qpid.protocol.ProtocolEngineFactory;
import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.transport.network.NetworkConnection;
+
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory
{
private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
private final IApplicationRegistry _appRegistry;
- private final String _fqdn;
private final Set<AmqpProtocolVersion> _supported;
+ private final AmqpProtocolVersion _defaultSupportedReply;
- public MultiVersionProtocolEngineFactory(String fqdn, Set<AmqpProtocolVersion> supportedVersions)
+ public MultiVersionProtocolEngineFactory(final Set<AmqpProtocolVersion> supportedVersions, final AmqpProtocolVersion defaultSupportedReply)
{
+ if(defaultSupportedReply != null && !supportedVersions.contains(defaultSupportedReply))
+ {
+ throw new IllegalArgumentException("The configured default reply (" + defaultSupportedReply
+ + ") to an unsupported protocol version initiation is itself not supported!");
+ }
+
_appRegistry = ApplicationRegistry.getInstance();
- _fqdn = fqdn;
_supported = supportedVersions;
- }
-
- public ServerProtocolEngine newProtocolEngine(NetworkConnection network)
- {
- return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, network, ID_GENERATOR.getAndIncrement());
+ _defaultSupportedReply = defaultSupportedReply;
}
public ServerProtocolEngine newProtocolEngine()
{
- return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, ID_GENERATOR.getAndIncrement());
+ return new MultiVersionProtocolEngine(_appRegistry, _supported, _defaultSupportedReply, ID_GENERATOR.getAndIncrement());
}
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java Sat Mar 10 19:22:10 2012
@@ -21,15 +21,19 @@
package org.apache.qpid.server.protocol;
import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.configuration.ConfigStore;
+import org.apache.qpid.server.configuration.ConfiguredObject;
+import org.apache.qpid.server.configuration.ConnectionConfig;
+import org.apache.qpid.server.configuration.ConnectionConfigType;
+import org.apache.qpid.server.configuration.VirtualHostConfig;
+import org.apache.qpid.server.logging.messages.ConnectionMessages;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.transport.ServerConnection;
import org.apache.qpid.transport.Sender;
-import org.apache.qpid.transport.network.InputHandler;
import org.apache.qpid.transport.network.Assembler;
import org.apache.qpid.transport.network.Disassembler;
+import org.apache.qpid.transport.network.InputHandler;
import org.apache.qpid.transport.network.NetworkConnection;
-import org.apache.qpid.server.configuration.*;
-import org.apache.qpid.server.transport.ServerConnection;
-import org.apache.qpid.server.logging.messages.ConnectionMessages;
-import org.apache.qpid.server.registry.IApplicationRegistry;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
@@ -86,8 +90,8 @@ public class ProtocolEngine_0_10 extend
_connection.setSender(new Disassembler(sender, MAX_FRAME_SIZE));
// FIXME Two log messages to maintain compatibility with earlier protocol versions
- _connection.getLogActor().message(ConnectionMessages.OPEN(null, null, false, false));
- _connection.getLogActor().message(ConnectionMessages.OPEN(null, "0-10", false, true));
+ _connection.getLogActor().message(ConnectionMessages.OPEN(null, null, null, false, false, false));
+ _connection.getLogActor().message(ConnectionMessages.OPEN(null, "0-10", null, false, true, false));
}
public SocketAddress getRemoteAddress()
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java Sat Mar 10 19:22:10 2012
@@ -29,7 +29,6 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInternalException;
-import org.apache.qpid.AMQInvalidArgumentException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
@@ -54,6 +53,7 @@ import org.apache.qpid.amqp_1_0.type.tra
import org.apache.qpid.amqp_1_0.type.transport.Detach;
import org.apache.qpid.amqp_1_0.type.transport.Error;
import org.apache.qpid.amqp_1_0.type.transport.Transfer;
+import org.apache.qpid.filter.selector.ParseException;
import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.TopicExchange;
@@ -135,7 +135,7 @@ public class SendingLink_1_0 implements
actualFilters.put(entry.getKey(), entry.getValue());
}
- catch (AMQInvalidArgumentException e)
+ catch (ParseException e)
{
Error error = new Error();
error.setCondition(AmqpError.INVALID_FIELD);
Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Mar 10 19:22:10 2012
@@ -4,4 +4,4 @@
/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:805429-821809
/qpid/branches/jmx_mc_gsoc09/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:787599
/qpid/branches/qpid-2935/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:1061302-1072333
-/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:753219-753220,753253,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,1073294-1157765,1160415-1162726,1162729-1166086,1166089-1235000
+/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:753219-753220,753253,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,1073294-1299226
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java Sat Mar 10 19:22:10 2012
@@ -20,9 +20,10 @@
*/
package org.apache.qpid.server.queue;
-import java.util.Map;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import java.util.Map;
+
public class AMQPriorityQueue extends OutOfOrderQueue
{
protected AMQPriorityQueue(final String name,
@@ -39,6 +40,6 @@ public class AMQPriorityQueue extends Ou
public int getPriorities()
{
- return ((PriorityQueueList) _entries).getPriorities();
+ return ((PriorityQueueList) getEntries()).getPriorities();
}
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Sat Mar 10 19:22:10 2012
@@ -22,20 +22,18 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
-import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.configuration.QueueConfig;
+import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeReferrer;
+import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.List;
@@ -142,10 +140,9 @@ public interface AMQQueue extends Managa
public List<QueueEntry> getMessagesRangeOnTheQueue(final long fromPosition, final long toPosition);
- void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName,
- ServerTransaction transaction);
+ void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName);
- void copyMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, ServerTransaction transaction);
+ void copyMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName);
void removeMessagesFromQueue(long fromMessageId, long toMessageId);
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java Sat Mar 10 19:22:10 2012
@@ -20,8 +20,6 @@
*/
package org.apache.qpid.server.queue;
-import java.util.HashMap;
-import java.util.Map;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.exchange.ExchangeDefaults;
@@ -35,6 +33,9 @@ import org.apache.qpid.server.exchange.E
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import java.util.HashMap;
+import java.util.Map;
+
public class AMQQueueFactory
{
public static final String X_QPID_PRIORITIES = "x-qpid-priorities";
@@ -48,6 +49,10 @@ public class AMQQueueFactory
public static final String X_QPID_MAXIMUM_DELIVERY_COUNT = "x-qpid-maximum-delivery-count";
public static final String DEFAULT_DLQ_NAME_SUFFIX = "_DLQ";
+ private AMQQueueFactory()
+ {
+ }
+
private abstract static class QueueProperty
{
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java Sat Mar 10 19:22:10 2012
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.queue;
+import org.apache.commons.lang.time.FastDateFormat;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
@@ -31,12 +32,10 @@ import org.apache.qpid.management.common
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.management.AMQManagedObject;
import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.MessageTransferMessage;
-import org.apache.qpid.server.txn.ServerTransaction;
-import org.apache.qpid.server.txn.LocalTransaction;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.transport.MessageProperties;
import javax.management.JMException;
@@ -56,9 +55,10 @@ import javax.management.openmbean.Simple
import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
-import java.text.SimpleDateFormat;
-import java.util.*;
/**
* AMQQueueMBean is the management bean for an {@link AMQQueue}.
@@ -72,11 +72,13 @@ public class AMQQueueMBean extends AMQMa
{
/** Used for debugging purposes. */
- private static final Logger _logger = Logger.getLogger(AMQQueueMBean.class);
+ private static final Logger LOGGER = Logger.getLogger(AMQQueueMBean.class);
/** Date/time format used for message expiration and message timestamp formatting */
public static final String JMSTIMESTAMP_DATETIME_FORMAT = "MM-dd-yy HH:mm:ss.SSS z";
+ private static final FastDateFormat FAST_DATE_FORMAT = FastDateFormat.getInstance(JMSTIMESTAMP_DATETIME_FORMAT);
+
private final AMQQueue _queue;
private final String _queueName;
// OpenMBean data types for viewMessages method
@@ -347,14 +349,14 @@ public class AMQQueueMBean extends AMQMa
public void notifyClients(NotificationCheck notification, AMQQueue queue, String notificationMsg)
{
// important : add log to the log file - monitoring tools may be looking for this
- _logger.info(notification.name() + " On Queue " + queue.getNameShortString() + " - " + notificationMsg);
+ LOGGER.info(notification.name() + " On Queue " + queue.getNameShortString() + " - " + notificationMsg);
notificationMsg = notification.name() + " " + notificationMsg;
_lastNotification =
- new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, ++_notificationSequenceNumber,
+ new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, incrementAndGetSequenceNumber(),
System.currentTimeMillis(), notificationMsg);
- _broadcaster.sendNotification(_lastNotification);
+ getBroadcaster().sendNotification(_lastNotification);
}
public Notification getLastNotification()
@@ -491,7 +493,7 @@ public class AMQQueueMBean extends AMQMa
ContentHeaderBody headerBody = msg.getContentHeaderBody();
// Create header attributes list
headerAttributes = getMessageHeaderProperties(headerBody);
- itemValues = new Object[]{msg.getMessageId(), headerAttributes, headerBody.bodySize, queueEntry.isRedelivered(), position, queueEntry.getDeliveryCount()};
+ itemValues = new Object[]{msg.getMessageId(), headerAttributes, headerBody.getBodySize(), queueEntry.isRedelivered(), position, queueEntry.getDeliveryCount()};
}
else if(serverMsg instanceof MessageTransferMessage)
{
@@ -589,18 +591,8 @@ public class AMQQueueMBean extends AMQMa
private void addStringifiedJMSTimestamoAndJMSExpiration(final List<String> list,
final long expirationDate, final long timestampDate)
{
- final SimpleDateFormat dateFormat;
- if (expirationDate != 0 || timestampDate != 0)
- {
- dateFormat = new SimpleDateFormat(JMSTIMESTAMP_DATETIME_FORMAT);
- }
- else
- {
- dateFormat = null;
- }
-
- final String formattedExpirationDate = (expirationDate != 0) ? dateFormat.format(new Date(expirationDate)) : null;
- final String formattedTimestampDate = (timestampDate != 0) ? dateFormat.format(new Date(timestampDate)) : null;
+ final String formattedExpirationDate = (expirationDate != 0) ? FAST_DATE_FORMAT.format(expirationDate) : null;
+ final String formattedTimestampDate = (timestampDate != 0) ? FAST_DATE_FORMAT.format(timestampDate) : null;
list.add("JMSExpiration = " + formattedExpirationDate);
list.add("JMSTimestamp = " + formattedTimestampDate);
}
@@ -619,9 +611,7 @@ public class AMQQueueMBean extends AMQMa
throw new OperationsException("\"From MessageId\" should be greater than 0 and less than \"To MessageId\"");
}
- ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getMessageStore());
- _queue.moveMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, txn);
- txn.commit();
+ _queue.moveMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName);
}
/**
@@ -654,13 +644,7 @@ public class AMQQueueMBean extends AMQMa
throw new OperationsException("\"From MessageId\" should be greater than 0 and less than \"To MessageId\"");
}
- ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getMessageStore());
-
- _queue.copyMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, txn);
-
- txn.commit();
-
-
+ _queue.copyMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName);
}
/**
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BaseQueue.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BaseQueue.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BaseQueue.java Sat Mar 10 19:22:10 2012
@@ -21,10 +21,10 @@
package org.apache.qpid.server.queue;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.store.TransactionLogResource;
public interface BaseQueue extends TransactionLogResource
{
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java Sat Mar 10 19:22:10 2012
@@ -41,7 +41,7 @@ public class ConflationQueue extends Sim
public String getConflationKey()
{
- return ((ConflationQueueList) _entries).getConflationKey();
+ return ((ConflationQueueList) getEntries()).getConflationKey();
}
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java Sat Mar 10 19:22:10 2012
@@ -21,13 +21,13 @@
package org.apache.qpid.server.queue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicReference;
-
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+
public class ConflationQueueList extends SimpleQueueEntryList
{
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Filterable.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Filterable.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Filterable.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Filterable.java Sat Mar 10 19:22:10 2012
@@ -20,8 +20,6 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.AMQException;
import org.apache.qpid.server.message.AMQMessageHeader;
public interface Filterable
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java Sat Mar 10 19:22:10 2012
@@ -21,9 +21,9 @@
package org.apache.qpid.server.queue;
-import org.apache.qpid.server.message.InboundMessage;
-import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.InboundMessage;
public class InboundMessageAdapter implements InboundMessage
{
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java Sat Mar 10 19:22:10 2012
@@ -20,25 +20,26 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.EnqueableMessage;
+import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.message.MessageContentSource;
import org.apache.qpid.server.message.MessageMetaData;
-import org.apache.qpid.AMQException;
-import org.apache.log4j.Logger;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.store.StoredMessage;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
-import java.nio.ByteBuffer;
public class IncomingMessage implements Filterable, InboundMessage, EnqueableMessage, MessageContentSource
{
@@ -69,8 +70,6 @@ public class IncomingMessage implements
private Exchange _exchange;
-
- private int _receivedChunkCount = 0;
private List<ContentChunk> _contentChunks = new ArrayList<ContentChunk>();
// we keep both the original meta data object and the store reference to it just in case the
@@ -79,13 +78,20 @@ public class IncomingMessage implements
private MessageMetaData _messageMetaData;
private StoredMessage<MessageMetaData> _storedMessageHandle;
+ private Object _connectionReference;
public IncomingMessage(
final MessagePublishInfo info
)
{
+ this(info, null);
+ }
+
+ public IncomingMessage(MessagePublishInfo info, Object reference)
+ {
_messagePublishInfo = info;
+ _connectionReference = reference;
}
public void setContentHeaderBody(final ContentHeaderBody contentHeaderBody) throws AMQException
@@ -124,12 +130,6 @@ public class IncomingMessage implements
}
- public MessageMetaData headersReceived()
- {
-
- return headersReceived(System.currentTimeMillis());
- }
-
public MessageMetaData headersReceived(long currentTime)
{
_messageMetaData = new MessageMetaData(_messagePublishInfo, _contentHeaderBody, 0, currentTime);
@@ -142,21 +142,15 @@ public class IncomingMessage implements
return _destinationQueues;
}
- public int addContentBodyFrame(final ContentChunk contentChunk)
- throws AMQException
+ public void addContentBodyFrame(final ContentChunk contentChunk) throws AMQException
{
- _storedMessageHandle.addContent((int)_bodyLengthReceived, ByteBuffer.wrap(contentChunk.getData()));
_bodyLengthReceived += contentChunk.getSize();
_contentChunks.add(contentChunk);
-
-
-
- return _receivedChunkCount++;
}
public boolean allContentReceived()
{
- return (_bodyLengthReceived == getContentHeader().bodySize);
+ return (_bodyLengthReceived == getContentHeader().getBodySize());
}
public AMQShortString getExchange()
@@ -217,7 +211,7 @@ public class IncomingMessage implements
public long getSize()
{
- return getContentHeader().bodySize;
+ return getContentHeader().getBodySize();
}
public long getMessageNumber()
@@ -251,18 +245,12 @@ public class IncomingMessage implements
return _expiration;
}
- public int getReceivedChunkCount()
- {
- return _receivedChunkCount;
- }
-
-
public int getBodyCount() throws AMQException
{
return _contentChunks.size();
}
- public ContentChunk getContentChunk(int index) throws IllegalArgumentException, AMQException
+ public ContentChunk getContentChunk(int index)
{
return _contentChunks.get(index);
}
@@ -317,4 +305,14 @@ public class IncomingMessage implements
{
return _storedMessageHandle;
}
+
+ public Object getConnectionReference()
+ {
+ return _connectionReference;
+ }
+
+ public MessageMetaData getMessageMetaData()
+ {
+ return _messageMetaData;
+ }
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java Sat Mar 10 19:22:10 2012
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.AMQException;
import org.apache.qpid.server.message.ServerMessage;
public enum NotificationCheck
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java Sat Mar 10 19:22:10 2012
@@ -1,11 +1,11 @@
package org.apache.qpid.server.queue;
-import java.util.Map;
-
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionList;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import java.util.Map;
+
public abstract class OutOfOrderQueue extends SimpleAMQQueue
{
@@ -20,7 +20,7 @@ public abstract class OutOfOrderQueue ex
protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry)
{
// check that all subscriptions are not in advance of the entry
- SubscriptionList.SubscriptionNodeIterator subIter = _subscriptionList.iterator();
+ SubscriptionList.SubscriptionNodeIterator subIter = getSubscriptionList().iterator();
while(subIter.advance() && !entry.isAcquired())
{
final Subscription subscription = subIter.getNode().getSubscription();
@@ -29,7 +29,7 @@ public abstract class OutOfOrderQueue ex
QueueContext context = (QueueContext) subscription.getQueueContext();
if(context != null)
{
- QueueEntry released = context._releasedEntry;
+ QueueEntry released = context.getReleasedEntry();
while(!entry.isAcquired() && (released == null || released.compareTo(entry) > 0))
{
if(QueueContext._releasedUpdater.compareAndSet(context,released,entry))
@@ -38,7 +38,7 @@ public abstract class OutOfOrderQueue ex
}
else
{
- released = context._releasedEntry;
+ released = context.getReleasedEntry();
}
}
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java Sat Mar 10 19:22:10 2012
@@ -74,7 +74,7 @@ public class PriorityQueueList implement
{
final QueueEntryList<?> nodeEntryList = node.getQueueEntryList();
int index;
- for(index = _priorityLists.length-1; _priorityLists[index] != nodeEntryList; index--);
+ for(index = _priorityLists.length-1; _priorityLists[index] != nodeEntryList; index--) {};
while(next == null && index != 0)
{
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueContext.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueContext.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueContext.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueContext.java Sat Mar 10 19:22:10 2012
@@ -25,8 +25,8 @@ import java.util.concurrent.atomic.Atomi
final class QueueContext implements AMQQueue.Context
{
- volatile QueueEntry _lastSeenEntry;
- volatile QueueEntry _releasedEntry;
+ private volatile QueueEntry _lastSeenEntry;
+ private volatile QueueEntry _releasedEntry;
static final AtomicReferenceFieldUpdater<QueueContext, QueueEntry>
_lastSeenUpdater =
@@ -46,4 +46,10 @@ final class QueueContext implements AMQQ
{
return _lastSeenEntry;
}
+
+
+ QueueEntry getReleasedEntry()
+ {
+ return _releasedEntry;
+ }
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Sat Mar 10 19:22:10 2012
@@ -1,10 +1,8 @@
package org.apache.qpid.server.queue;
-import java.util.Collection;
-
import org.apache.qpid.AMQException;
-import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.subscription.Subscription;
/*
*
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Sat Mar 10 19:22:10 2012
@@ -34,7 +34,6 @@ import org.apache.qpid.server.message.AM
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -416,11 +415,19 @@ public abstract class QueueEntryImpl imp
if (alternateExchange != null)
{
- final List<? extends BaseQueue> rerouteQueues = alternateExchange.route(new InboundMessageAdapter(this));
+ InboundMessageAdapter inboundMessageAdapter = new InboundMessageAdapter(this);
+ List<? extends BaseQueue> queues = alternateExchange.route(inboundMessageAdapter);
final ServerMessage message = getMessage();
- if (rerouteQueues != null && rerouteQueues.size() != 0)
+ if ((queues == null || queues.size() == 0) && alternateExchange.getAlternateExchange() != null)
{
+ queues = alternateExchange.getAlternateExchange().route(inboundMessageAdapter);
+ }
+
+
+ if (queues != null && queues.size() != 0)
+ {
+ final List<? extends BaseQueue> rerouteQueues = queues;
ServerTransaction txn = new LocalTransaction(getQueue().getVirtualHost().getMessageStore());
txn.enqueue(rerouteQueues, message, new ServerTransaction.Action()
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java Sat Mar 10 19:22:10 2012
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.virtualhost.VirtualHost;
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java Sat Mar 10 19:22:10 2012
@@ -20,17 +20,16 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.queue.QueueRunner;
-import org.apache.qpid.server.queue.SimpleAMQQueue;
-
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.transport.TransportException;
+
/**
* QueueRunners are Runnables used to process a queue when requiring
* asynchronous message delivery to subscriptions, which is necessary
@@ -47,7 +46,6 @@ public class QueueRunner implements Runn
private static int SCHEDULED = 1;
private static int RUNNING = 2;
-
private final AtomicInteger _scheduled = new AtomicInteger(IDLE);
private final AtomicBoolean _stateChange = new AtomicBoolean();
@@ -55,8 +53,6 @@ public class QueueRunner implements Runn
private final AtomicLong _lastRunAgain = new AtomicLong();
private final AtomicLong _lastRunTime = new AtomicLong();
- private long _continues;
-
public QueueRunner(SimpleAMQQueue queue)
{
_queue = queue;
@@ -74,24 +70,35 @@ public class QueueRunner implements Runn
runAgain = _queue.processQueue(this);
}
- catch (AMQException e)
+ catch (final AMQException e)
{
_logger.error("Exception during asynchronous delivery by " + toString(), e);
}
- finally
+ catch (final TransportException transe)
{
- CurrentActor.remove();
+ final String errorMessage = "Problem during asynchronous delivery by " + toString();
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug(errorMessage, transe);
+ }
+ else
+ {
+ _logger.info(errorMessage + ' ' + transe.getMessage());
+ }
}
- _scheduled.compareAndSet(RUNNING, IDLE);
- long stateChangeCount = _queue.getStateChangeCount();
- _lastRunAgain.set(runAgain);
- _lastRunTime.set(System.nanoTime());
- if(runAgain == 0L || runAgain != stateChangeCount || _stateChange.compareAndSet(true,false))
+ finally
{
- _continues++;
- if(_scheduled.compareAndSet(IDLE, SCHEDULED))
+ CurrentActor.remove();
+ _scheduled.compareAndSet(RUNNING, IDLE);
+ final long stateChangeCount = _queue.getStateChangeCount();
+ _lastRunAgain.set(runAgain);
+ _lastRunTime.set(System.nanoTime());
+ if(runAgain == 0L || runAgain != stateChangeCount || _stateChange.compareAndSet(true,false))
{
- _queue.execute(this);
+ if(_scheduled.compareAndSet(IDLE, SCHEDULED))
+ {
+ _queue.execute(this);
+ }
}
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Sat Mar 10 19:22:10 2012
@@ -27,12 +27,16 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.management.JMException;
+
+import javax.management.JMException;
+
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQSecurityException;
@@ -99,13 +103,10 @@ public class SimpleAMQQueue implements A
private Exchange _alternateExchange;
- /** Used to track bindings to exchanges so that on deletion they can easily be cancelled. */
+ private final QueueEntryList<QueueEntry> _entries;
-
- protected final QueueEntryList _entries;
-
- protected final SubscriptionList _subscriptionList = new SubscriptionList();
+ private final SubscriptionList _subscriptionList = new SubscriptionList();
private volatile Subscription _exclusiveSubscriber;
@@ -137,19 +138,19 @@ public class SimpleAMQQueue implements A
private final AtomicInteger _bindingCountHigh = new AtomicInteger();
/** max allowed size(KB) of a single message */
- public long _maximumMessageSize = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageSize();
+ private long _maximumMessageSize = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageSize();
/** max allowed number of messages on a queue. */
- public long _maximumMessageCount = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageCount();
+ private long _maximumMessageCount = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageCount();
/** max queue depth for the queue */
- public long _maximumQueueDepth = ApplicationRegistry.getInstance().getConfiguration().getMaximumQueueDepth();
+ private long _maximumQueueDepth = ApplicationRegistry.getInstance().getConfiguration().getMaximumQueueDepth();
/** maximum message age before alerts occur */
- public long _maximumMessageAge = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageAge();
+ private long _maximumMessageAge = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageAge();
/** the minimum interval between sending out consecutive alerts of the same type */
- public long _minimumAlertRepeatGap = ApplicationRegistry.getInstance().getConfiguration().getMinimumAlertRepeatGap();
+ private long _minimumAlertRepeatGap = ApplicationRegistry.getInstance().getConfiguration().getMinimumAlertRepeatGap();
private long _capacity = ApplicationRegistry.getInstance().getConfiguration().getCapacity();
@@ -167,7 +168,7 @@ public class SimpleAMQQueue implements A
private AtomicInteger _deliveredMessages = new AtomicInteger();
private AtomicBoolean _stopped = new AtomicBoolean(false);
- private final ConcurrentMap<AMQSessionModel, Boolean> _blockedChannels = new ConcurrentHashMap<AMQSessionModel, Boolean>();
+ private final Set<AMQSessionModel> _blockedChannels = new ConcurrentSkipListSet<AMQSessionModel>();
private final AtomicBoolean _deleted = new AtomicBoolean(false);
private final List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>();
@@ -455,7 +456,10 @@ public class SimpleAMQQueue implements A
}
}
- _activeSubscriberCount.incrementAndGet();
+ if(subscription.isActive())
+ {
+ _activeSubscriberCount.incrementAndGet();
+ }
subscription.setStateListener(this);
subscription.setQueueContext(new QueueContext(_entries.getHead()));
@@ -778,7 +782,9 @@ public class SimpleAMQQueue implements A
private boolean mightAssign(final Subscription sub, final QueueEntry entry)
{
if(_messageGroupManager == null || !sub.acquires())
+ {
return true;
+ }
Subscription assigned = _messageGroupManager.getAssignedSubscription(entry);
return (assigned == null) || (assigned == sub);
}
@@ -848,7 +854,7 @@ public class SimpleAMQQueue implements A
QueueContext context = (QueueContext) subscription.getQueueContext();
if(context != null)
{
- QueueEntry subnode = context._lastSeenEntry;
+ QueueEntry subnode = context.getLastSeenEntry();
if(subnode.compareTo(entry)<0)
{
return false;
@@ -872,7 +878,7 @@ public class SimpleAMQQueue implements A
private void setLastSeenEntry(final Subscription sub, final QueueEntry entry)
{
QueueContext subContext = (QueueContext) sub.getQueueContext();
- QueueEntry releasedEntry = subContext._releasedEntry;
+ QueueEntry releasedEntry = subContext.getReleasedEntry();
QueueContext._lastSeenUpdater.set(subContext, entry);
if(releasedEntry == entry)
@@ -889,7 +895,7 @@ public class SimpleAMQQueue implements A
{
QueueEntry oldEntry;
- while((oldEntry = subContext._releasedEntry) == null || oldEntry.compareTo(entry) > 0)
+ while((oldEntry = subContext.getReleasedEntry()) == null || oldEntry.compareTo(entry) > 0)
{
if(QueueContext._releasedUpdater.compareAndSet(subContext, oldEntry, entry))
{
@@ -1113,6 +1119,17 @@ public class SimpleAMQQueue implements A
return _stateChangeCount.get();
}
+ /** Used to track bindings to exchanges so that on deletion they can easily be cancelled. */
+ protected QueueEntryList getEntries()
+ {
+ return _entries;
+ }
+
+ protected SubscriptionList getSubscriptionList()
+ {
+ return _subscriptionList;
+ }
+
public static interface QueueEntryFilter
{
@@ -1226,19 +1243,10 @@ public class SimpleAMQQueue implements A
public void moveMessagesToAnotherQueue(final long fromMessageId,
final long toMessageId,
- String queueName,
- ServerTransaction txn) throws IllegalArgumentException
+ String destinationQueueName) throws IllegalArgumentException
{
- final AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
- if (toQueue == null)
- {
- throw new IllegalArgumentException("Queue '" + queueName + "' is not registered with the virtualhost.");
- }
- else if (toQueue == this)
- {
- throw new IllegalArgumentException("The destination queue cant be the same as the source queue");
- }
+ final AMQQueue toQueue = getValidatedDestinationQueue(destinationQueueName);
List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
{
@@ -1258,65 +1266,68 @@ public class SimpleAMQQueue implements A
});
-
- // Move the messages in on the message store.
- for (final QueueEntry entry : entries)
+ final ServerTransaction txn = new LocalTransaction(getVirtualHost().getMessageStore());
+ boolean shouldRollback = true;
+ try
{
- final ServerMessage message = entry.getMessage();
- txn.enqueue(toQueue, message,
- new ServerTransaction.Action()
- {
-
- public void postCommit()
+ // Move the messages in on the message store.
+ for (final QueueEntry entry : entries)
+ {
+ final ServerMessage message = entry.getMessage();
+ txn.enqueue(toQueue, message,
+ new ServerTransaction.Action()
{
- try
+
+ public void postCommit()
{
- toQueue.enqueue(message);
+ try
+ {
+ toQueue.enqueue(message);
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException(e);
+ }
}
- catch (AMQException e)
+
+ public void onRollback()
{
- throw new RuntimeException(e);
+ entry.release();
}
- }
-
- public void onRollback()
+ });
+ txn.dequeue(this, message,
+ new ServerTransaction.Action()
{
- entry.release();
- }
- });
- txn.dequeue(this, message,
- new ServerTransaction.Action()
- {
- public void postCommit()
- {
- entry.discard();
- }
-
- public void onRollback()
- {
+ public void postCommit()
+ {
+ entry.discard();
+ }
- }
- });
+ public void onRollback()
+ {
+ }
+ });
+ }
+ txn.commit();
+ shouldRollback = false;
+ }
+ finally
+ {
+ if (shouldRollback)
+ {
+ txn.rollback();
+ }
}
}
public void copyMessagesToAnotherQueue(final long fromMessageId,
final long toMessageId,
- String queueName,
- final ServerTransaction txn) throws IllegalArgumentException
+ String destinationQueueName) throws IllegalArgumentException
{
- final AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
- if (toQueue == null)
- {
- throw new IllegalArgumentException("Queue '" + queueName + "' is not registered with the virtualhost.");
- }
- else if (toQueue == this)
- {
- throw new IllegalArgumentException("The destination queue cant be the same as the source queue");
- }
+ final AMQQueue toQueue = getValidatedDestinationQueue(destinationQueueName);
List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
{
@@ -1334,36 +1345,63 @@ public class SimpleAMQQueue implements A
}
});
-
- // Move the messages in on the message store.
- for (QueueEntry entry : entries)
+ final ServerTransaction txn = new LocalTransaction(_virtualHost.getMessageStore());
+ boolean shouldRollback = true;
+ try
{
- final ServerMessage message = entry.getMessage();
-
- txn.enqueue(toQueue, message, new ServerTransaction.Action()
+ // Copy the messages in on the message store.
+ for (QueueEntry entry : entries)
{
- public void postCommit()
+ final ServerMessage message = entry.getMessage();
+
+ txn.enqueue(toQueue, message, new ServerTransaction.Action()
{
- try
+ public void postCommit()
{
- toQueue.enqueue(message);
+ try
+ {
+ toQueue.enqueue(message);
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException(e);
+ }
}
- catch (AMQException e)
+
+ public void onRollback()
{
- throw new RuntimeException(e);
}
- }
-
- public void onRollback()
- {
+ });
- }
- });
+ }
+ txn.commit();
+ shouldRollback = false;
+ }
+ finally
+ {
+ if (shouldRollback)
+ {
+ txn.rollback();
+ }
}
}
+ private AMQQueue getValidatedDestinationQueue(String queueName)
+ {
+ final AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
+ if (toQueue == null)
+ {
+ throw new IllegalArgumentException("Queue '" + queueName + "' is not registered with the virtualhost.");
+ }
+ else if (toQueue == this)
+ {
+ throw new IllegalArgumentException("The destination queue can't be the same as the source queue");
+ }
+ return toQueue;
+ }
+
public void removeMessagesFromQueue(long fromMessageId, long toMessageId)
{
@@ -1543,10 +1581,16 @@ public class SimpleAMQQueue implements A
for(final QueueEntry entry : entries)
{
adapter.setEntry(entry);
- final List<? extends BaseQueue> rerouteQueues = _alternateExchange.route(adapter);
+ List<? extends BaseQueue> queues = _alternateExchange.route(adapter);
+ if((queues == null || queues.size() == 0) && _alternateExchange.getAlternateExchange() != null)
+ {
+ queues = _alternateExchange.getAlternateExchange().route(adapter);
+ }
+
final ServerMessage message = entry.getMessage();
- if(rerouteQueues != null && rerouteQueues.size() != 0)
+ if(queues != null && queues.size() != 0)
{
+ final List<? extends BaseQueue> rerouteQueues = queues;
txn.enqueue(rerouteQueues, entry.getMessage(),
new ServerTransaction.Action()
{
@@ -1659,7 +1703,7 @@ public class SimpleAMQQueue implements A
//Overfull log message
_logActor.message(_logSubject, QueueMessages.OVERFULL(_atomicQueueSize.get(), _capacity));
- _blockedChannels.putIfAbsent(channel, Boolean.TRUE);
+ _blockedChannels.add(channel);
channel.block(this);
@@ -1692,11 +1736,10 @@ public class SimpleAMQQueue implements A
_logActor.message(_logSubject, QueueMessages.UNDERFULL(_atomicQueueSize.get(), _flowResumeCapacity));
}
-
- for(AMQSessionModel c : _blockedChannels.keySet())
+ for(final AMQSessionModel blockedChannel : _blockedChannels)
{
- c.unblock(this);
- _blockedChannels.remove(c);
+ blockedChannel.unblock(this);
+ _blockedChannels.remove(blockedChannel);
}
}
}
@@ -1714,7 +1757,6 @@ public class SimpleAMQQueue implements A
public void deliverAsync(Subscription sub)
{
- //_stateChangeCount.incrementAndGet();
if(_exclusiveSubscriber == null)
{
deliverAsync();
@@ -1890,8 +1932,8 @@ public class SimpleAMQQueue implements A
QueueContext context = (QueueContext) sub.getQueueContext();
if(context != null)
{
- QueueEntry lastSeen = context._lastSeenEntry;
- QueueEntry releasedNode = context._releasedEntry;
+ QueueEntry lastSeen = context.getLastSeenEntry();
+ QueueEntry releasedNode = context.getReleasedEntry();
QueueEntry node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen);
@@ -1913,8 +1955,8 @@ public class SimpleAMQQueue implements A
QueueContext._releasedUpdater.compareAndSet(context, releasedNode, null);
}
- lastSeen = context._lastSeenEntry;
- releasedNode = context._releasedEntry;
+ lastSeen = context.getLastSeenEntry();
+ releasedNode = context.getReleasedEntry();
node = (releasedNode != null && lastSeen.compareTo(releasedNode)>0) ? releasedNode : _entries.next(lastSeen);
}
return node;
@@ -1930,8 +1972,8 @@ public class SimpleAMQQueue implements A
QueueContext context = (QueueContext) sub.getQueueContext();
if(context != null)
{
- QueueEntry releasedNode = context._releasedEntry;
- return releasedNode == null || releasedNode.compareTo(entry) < 0;
+ QueueEntry releasedNode = context.getReleasedEntry();
+ return releasedNode != null && releasedNode.compareTo(entry) < 0;
}
else
{
@@ -2255,8 +2297,7 @@ public class SimpleAMQQueue implements A
public boolean equals(Object o)
{
- return o != null
- && o instanceof SimpleAMQQueue.QueueEntryListener
+ return o instanceof SimpleAMQQueue.QueueEntryListener
&& _sub == ((QueueEntryListener) o)._sub;
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryImpl.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryImpl.java Sat Mar 10 19:22:10 2012
@@ -22,9 +22,16 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.server.message.ServerMessage;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
public class SimpleQueueEntryImpl extends QueueEntryImpl
{
- volatile SimpleQueueEntryImpl _next;
+ static final AtomicReferenceFieldUpdater<SimpleQueueEntryImpl, SimpleQueueEntryImpl>
+ _nextUpdater =
+ AtomicReferenceFieldUpdater.newUpdater
+ (SimpleQueueEntryImpl.class, SimpleQueueEntryImpl.class, "_next");
+
+ private volatile SimpleQueueEntryImpl _next;
public SimpleQueueEntryImpl(SimpleQueueEntryList queueEntryList)
{
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java Sat Mar 10 19:22:10 2012
@@ -20,9 +20,11 @@
*/
package org.apache.qpid.server.queue;
+import org.apache.qpid.server.message.ServerMessage;
+
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-import org.apache.qpid.server.message.ServerMessage;
public class SimpleQueueEntryList implements QueueEntryList<SimpleQueueEntryImpl>
{
@@ -40,12 +42,11 @@ public class SimpleQueueEntryList implem
private final AMQQueue _queue;
static final AtomicReferenceFieldUpdater<SimpleQueueEntryImpl, SimpleQueueEntryImpl>
- _nextUpdater =
- AtomicReferenceFieldUpdater.newUpdater
- (SimpleQueueEntryImpl.class, SimpleQueueEntryImpl.class, "_next");
+ _nextUpdater = SimpleQueueEntryImpl._nextUpdater;
private AtomicLong _scavenges = new AtomicLong(0L);
private final long _scavengeCount = Integer.getInteger("qpid.queue.scavenge_count", 50);
+ private final AtomicReference<SimpleQueueEntryImpl> _unscavengedHWM = new AtomicReference<SimpleQueueEntryImpl>();
public SimpleQueueEntryList(AMQQueue queue)
@@ -55,28 +56,17 @@ public class SimpleQueueEntryList implem
_tail = _head;
}
- void advanceHead()
- {
- SimpleQueueEntryImpl next = _head.getNextNode();
- SimpleQueueEntryImpl newNext = _head.getNextValidEntry();
-
- if (next == newNext)
- {
- if (_scavenges.incrementAndGet() > _scavengeCount)
- {
- _scavenges.set(0L);
- scavenge();
- }
- }
- }
-
void scavenge()
{
+ SimpleQueueEntryImpl hwm = _unscavengedHWM.getAndSet(null);
SimpleQueueEntryImpl next = _head.getNextValidEntry();
- while (next != null)
+ if(hwm != null)
{
- next = next.getNextValidEntry();
+ while (next != null && hwm.compareTo(next)>0)
+ {
+ next = next.getNextValidEntry();
+ }
}
}
@@ -126,7 +116,6 @@ public class SimpleQueueEntryList implem
public static class QueueEntryIteratorImpl implements QueueEntryIterator<SimpleQueueEntryImpl>
{
-
private SimpleQueueEntryImpl _lastNode;
QueueEntryIteratorImpl(SimpleQueueEntryImpl startNode)
@@ -134,10 +123,9 @@ public class SimpleQueueEntryList implem
_lastNode = startNode;
}
-
public boolean atTail()
{
- return _lastNode.getNextNode() == null;
+ return _lastNode.getNextValidEntry() == null;
}
public SimpleQueueEntryImpl getNode()
@@ -147,28 +135,17 @@ public class SimpleQueueEntryList implem
public boolean advance()
{
+ SimpleQueueEntryImpl nextValidNode = _lastNode.getNextValidEntry();
- if(!atTail())
+ if(nextValidNode != null)
{
- SimpleQueueEntryImpl nextNode = _lastNode.getNextNode();
- while(nextNode.isDispensed() && nextNode.getNextNode() != null)
- {
- nextNode = nextNode.getNextNode();
- }
- _lastNode = nextNode;
- return true;
-
- }
- else
- {
- return false;
+ _lastNode = nextValidNode;
}
+ return nextValidNode != null;
}
-
}
-
public QueueEntryIteratorImpl iterator()
{
return new QueueEntryIteratorImpl(_head);
@@ -182,7 +159,32 @@ public class SimpleQueueEntryList implem
public void entryDeleted(SimpleQueueEntryImpl queueEntry)
{
- advanceHead();
+ SimpleQueueEntryImpl next = _head.getNextNode();
+ SimpleQueueEntryImpl newNext = _head.getNextValidEntry();
+
+ // the head of the queue has not been deleted, hence the deletion must have been mid queue.
+ if (next == newNext)
+ {
+ SimpleQueueEntryImpl unscavengedHWM = _unscavengedHWM.get();
+ while(unscavengedHWM == null || unscavengedHWM.compareTo(queueEntry)<0)
+ {
+ _unscavengedHWM.compareAndSet(unscavengedHWM, queueEntry);
+ unscavengedHWM = _unscavengedHWM.get();
+ }
+ if (_scavenges.incrementAndGet() > _scavengeCount)
+ {
+ _scavenges.set(0L);
+ scavenge();
+ }
+ }
+ else
+ {
+ SimpleQueueEntryImpl unscavengedHWM = _unscavengedHWM.get();
+ if(unscavengedHWM != null && (next == null || unscavengedHWM.compareTo(next) < 0))
+ {
+ _unscavengedHWM.compareAndSet(unscavengedHWM, null);
+ }
+ }
}
public int getPriorities()
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueue.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueue.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueue.java Sat Mar 10 19:22:10 2012
@@ -19,11 +19,12 @@
*/
package org.apache.qpid.server.queue;
-import java.util.Map;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import java.util.Map;
+
public class SortedQueue extends OutOfOrderQueue
{
//Lock object to synchronize enqueue. Used instead of the object
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java Sat Mar 10 19:22:10 2012
@@ -21,12 +21,7 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.server.message.ServerMessage;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.server.queue.SortedQueueEntryImpl.Colour;
-import org.apache.qpid.server.store.StoreContext;
/**
* A sorted implementation of QueueEntryList.
@@ -367,7 +362,7 @@ public class SortedQueueEntryList implem
if(chosenChild != null)
{
- // we have one child (x), we can move it up to replace x;
+ // we have one child (x), we can move it up to replace x
chosenChild.setParent(entry.getParent());
if(chosenChild.getParent() == null)
{
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java Sat Mar 10 19:22:10 2012
@@ -21,14 +21,16 @@ package org.apache.qpid.server.queue;
*/
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
import org.apache.log4j.Logger;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.transport.TransportException;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
class SubFlushRunner implements Runnable
@@ -67,18 +69,30 @@ class SubFlushRunner implements Runnable
}
catch (AMQException e)
{
- _logger.error(e);
+ _logger.error("Exception during asynchronous delivery by " + toString(), e);
}
- finally
+ catch (final TransportException transe)
{
- CurrentActor.remove();
+ final String errorMessage = "Problem during asynchronous delivery by " + toString();
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug(errorMessage, transe);
+ }
+ else
+ {
+ _logger.info(errorMessage + ' ' + transe.getMessage());
+ }
}
- _scheduled.compareAndSet(RUNNING, IDLE);
- if ((!complete || _stateChange.compareAndSet(true,false))&& !_sub.isSuspended())
+ finally
{
- if(_scheduled.compareAndSet(IDLE,SCHEDULED))
+ CurrentActor.remove();
+ _scheduled.compareAndSet(RUNNING, IDLE);
+ if ((!complete || _stateChange.compareAndSet(true,false))&& !_sub.isSuspended())
{
- getQueue().execute(this);
+ if(_scheduled.compareAndSet(IDLE,SCHEDULED))
+ {
+ getQueue().execute(this);
+ }
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org