You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2014/02/12 14:27:57 UTC
svn commit: r1567616 [7/12] - in /qpid/branches/java-broker-bdb-ha: ./ qpid/
qpid/cpp/bindings/qmf2/ruby/ qpid/cpp/bindings/qpid/examples/perl/
qpid/cpp/bindings/qpid/perl/ qpid/cpp/bindings/qpid/perl/lib/qpid/messaging/
qpid/cpp/bindings/qpid/ruby/ qp...
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java Wed Feb 12 13:27:51 2014
@@ -70,7 +70,7 @@ public interface Transaction
public static interface Record
{
- TransactionLogResource getQueue();
+ TransactionLogResource getResource();
EnqueueableMessage getMessage();
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java Wed Feb 12 13:27:51 2014
@@ -24,5 +24,7 @@ import java.util.UUID;
public interface TransactionLogResource
{
+ String getName();
public UUID getId();
+ boolean isDurable();
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java Wed Feb 12 13:27:51 2014
@@ -25,12 +25,14 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.Transaction;
+import org.apache.qpid.server.store.TransactionLogResource;
import java.util.Collection;
import java.util.List;
@@ -88,7 +90,7 @@ public class AsyncAutoCommitTransaction
}
- public void dequeue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction)
+ public void dequeue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction)
{
Transaction txn = null;
try
@@ -158,15 +160,15 @@ public class AsyncAutoCommitTransaction
}
}
- public void dequeue(Collection<QueueEntry> queueEntries, Action postTransactionAction)
+ public void dequeue(Collection<MessageInstance> queueEntries, Action postTransactionAction)
{
Transaction txn = null;
try
{
- for(QueueEntry entry : queueEntries)
+ for(MessageInstance entry : queueEntries)
{
ServerMessage message = entry.getMessage();
- BaseQueue queue = entry.getQueue();
+ TransactionLogResource queue = entry.getOwningResource();
if(message.isPersistent() && queue.isDurable())
{
@@ -210,7 +212,7 @@ public class AsyncAutoCommitTransaction
}
- public void enqueue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction)
+ public void enqueue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction)
{
Transaction txn = null;
try
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java Wed Feb 12 13:27:51 2014
@@ -25,11 +25,13 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.Transaction;
+import org.apache.qpid.server.store.TransactionLogResource;
import java.util.Collection;
import java.util.List;
@@ -73,7 +75,7 @@ public class AutoCommitTransaction imple
immediateAction.postCommit();
}
- public void dequeue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction)
+ public void dequeue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction)
{
Transaction txn = null;
try
@@ -105,15 +107,15 @@ public class AutoCommitTransaction imple
}
- public void dequeue(Collection<QueueEntry> queueEntries, Action postTransactionAction)
+ public void dequeue(Collection<MessageInstance> queueEntries, Action postTransactionAction)
{
Transaction txn = null;
try
{
- for(QueueEntry entry : queueEntries)
+ for(MessageInstance entry : queueEntries)
{
ServerMessage message = entry.getMessage();
- BaseQueue queue = entry.getQueue();
+ TransactionLogResource queue = entry.getOwningResource();
if(message.isPersistent() && queue.isDurable())
{
@@ -152,7 +154,7 @@ public class AutoCommitTransaction imple
}
- public void enqueue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction)
+ public void enqueue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction)
{
Transaction txn = null;
try
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java Wed Feb 12 13:27:51 2014
@@ -22,10 +22,12 @@
package org.apache.qpid.server.txn;
import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.Xid;
@@ -74,7 +76,7 @@ public class DistributedTransaction impl
}
}
- public void dequeue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction)
+ public void dequeue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction)
{
if(_branch != null)
{
@@ -87,13 +89,13 @@ public class DistributedTransaction impl
}
}
- public void dequeue(Collection<QueueEntry> messages, Action postTransactionAction)
+ public void dequeue(Collection<MessageInstance> messages, Action postTransactionAction)
{
if(_branch != null)
{
- for(QueueEntry entry : messages)
+ for(MessageInstance entry : messages)
{
- _branch.dequeue(entry.getQueue(), entry.getMessage());
+ _branch.dequeue(entry.getOwningResource(), entry.getMessage());
}
_branch.addPostTransactionAction(postTransactionAction);
}
@@ -103,13 +105,12 @@ public class DistributedTransaction impl
}
}
- public void enqueue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction)
+ public void enqueue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction)
{
if(_branch != null)
{
_branch.enqueue(queue, message);
_branch.addPostTransactionAction(postTransactionAction);
- enqueue(Collections.singletonList(queue), message, postTransactionAction);
}
else
{
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java Wed Feb 12 13:27:51 2014
@@ -34,6 +34,7 @@ import org.apache.qpid.server.protocol.A
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.Transaction;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.Xid;
@@ -335,7 +336,7 @@ public class DtxBranch
{
if(enqueue.isDurable())
{
- _transaction.enqueueMessage(enqueue.getQueue(), enqueue.getMessage());
+ _transaction.enqueueMessage(enqueue.getResource(), enqueue.getMessage());
}
}
@@ -344,7 +345,7 @@ public class DtxBranch
{
if(enqueue.isDurable())
{
- _transaction.dequeueMessage(enqueue.getQueue(), enqueue.getMessage());
+ _transaction.dequeueMessage(enqueue.getResource(), enqueue.getMessage());
}
}
}
@@ -356,31 +357,31 @@ public class DtxBranch
}
- public void dequeue(BaseQueue queue, EnqueueableMessage message)
+ public void dequeue(TransactionLogResource resource, EnqueueableMessage message)
{
- _dequeueRecords.add(new Record(queue, message));
+ _dequeueRecords.add(new Record(resource, message));
}
- public void enqueue(BaseQueue queue, EnqueueableMessage message)
+ public void enqueue(TransactionLogResource queue, EnqueueableMessage message)
{
_enqueueRecords.add(new Record(queue, message));
}
private static final class Record implements Transaction.Record
{
- private final BaseQueue _queue;
+ private final TransactionLogResource _resource;
private final EnqueueableMessage _message;
- public Record(BaseQueue queue, EnqueueableMessage message)
+ public Record(TransactionLogResource resource, EnqueueableMessage message)
{
- _queue = queue;
+ _resource = resource;
_message = message;
}
- public BaseQueue getQueue()
+ public TransactionLogResource getResource()
{
- return _queue;
+ return _resource;
}
public EnqueueableMessage getMessage()
@@ -390,7 +391,7 @@ public class DtxBranch
public boolean isDurable()
{
- return _message.isPersistent() && _queue.isDurable();
+ return _message.isPersistent() && _resource.isDurable();
}
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java Wed Feb 12 13:27:51 2014
@@ -21,7 +21,9 @@
package org.apache.qpid.server.txn;
import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -91,7 +93,7 @@ public class LocalTransaction implements
_postTransactionActions.add(postTransactionAction);
}
- public void dequeue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction)
+ public void dequeue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction)
{
sync();
_postTransactionActions.add(postTransactionAction);
@@ -118,7 +120,7 @@ public class LocalTransaction implements
}
}
- public void dequeue(Collection<QueueEntry> queueEntries, Action postTransactionAction)
+ public void dequeue(Collection<MessageInstance> queueEntries, Action postTransactionAction)
{
sync();
_postTransactionActions.add(postTransactionAction);
@@ -126,10 +128,10 @@ public class LocalTransaction implements
try
{
- for(QueueEntry entry : queueEntries)
+ for(MessageInstance entry : queueEntries)
{
ServerMessage message = entry.getMessage();
- BaseQueue queue = entry.getQueue();
+ TransactionLogResource queue = entry.getOwningResource();
if(message.isPersistent() && queue.isDurable())
{
@@ -195,7 +197,7 @@ public class LocalTransaction implements
}
}
- public void enqueue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction)
+ public void enqueue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction)
{
sync();
_postTransactionActions.add(postTransactionAction);
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java Wed Feb 12 13:27:51 2014
@@ -24,8 +24,9 @@ import java.util.Collection;
import java.util.List;
import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.queue.BaseQueue;
-import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.store.TransactionLogResource;
/**
@@ -79,21 +80,21 @@ public interface ServerTransaction
*
* A store operation will result only for a persistent message on a durable queue.
*/
- void dequeue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction);
+ void dequeue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction);
/**
* Dequeue a message(s) from queue(s) registering a post transaction action.
*
* Store operations will result only for a persistent messages on durable queues.
*/
- void dequeue(Collection<QueueEntry> messages, Action postTransactionAction);
+ void dequeue(Collection<MessageInstance> messages, Action postTransactionAction);
/**
* Enqueue a message to a queue registering a post transaction action.
*
* A store operation will result only for a persistent message on a durable queue.
*/
- void enqueue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction);
+ void enqueue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction);
/**
* Enqueue a message(s) to queue(s) registering a post transaction action.
Propchange: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/
('svn:mergeinfo' removed)
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Wed Feb 12 13:27:51 2014
@@ -52,8 +52,13 @@ import org.apache.qpid.server.logging.ac
import org.apache.qpid.server.logging.messages.VirtualHostMessages;
import org.apache.qpid.server.model.ConfigurationChangeListener;
import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageNode;
+import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
+import org.apache.qpid.server.plugin.QpidServiceLoader;
+import org.apache.qpid.server.plugin.SystemNodeCreator;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.LinkRegistry;
@@ -105,8 +110,8 @@ public abstract class AbstractVirtualHos
private final DtxRegistry _dtxRegistry;
private final AMQQueueFactory _queueFactory;
-
- private final org.apache.qpid.server.model.VirtualHost _virtualHost;
+ private final SystemNodeRegistry _systemNodeRegistry = new SystemNodeRegistry();
+ private final org.apache.qpid.server.model.VirtualHost _model;
private final Map<String, LinkRegistry> _linkRegistry = new HashMap<String, LinkRegistry>();
@@ -125,6 +130,13 @@ public abstract class AbstractVirtualHos
private volatile State _state = State.INITIALISING;
private volatile ScheduledThreadPoolExecutor _houseKeepingTasks;
+ private final Map<String, MessageDestination> _systemNodeDestinations =
+ Collections.synchronizedMap(new HashMap<String,MessageDestination>());
+
+ private final Map<String, MessageSource> _systemNodeSources =
+ Collections.synchronizedMap(new HashMap<String,MessageSource>());
+
+
public AbstractVirtualHost(VirtualHostRegistry virtualHostRegistry,
StatisticsGatherer brokerStatisticsGatherer,
SecurityManager parentSecurityManager,
@@ -146,6 +158,7 @@ public abstract class AbstractVirtualHos
_vhostConfig = hostConfig;
_name = _vhostConfig.getName();
_dtxRegistry = new DtxRegistry();
+ _model = virtualHost;
_id = UUIDGenerator.generateVhostUUID(_name);
@@ -162,7 +175,7 @@ public abstract class AbstractVirtualHos
_exchangeRegistry = new DefaultExchangeRegistry(this, _queueRegistry);
- _virtualHost = virtualHost;
+ registerSystemNodes();
initialiseStatistics();
@@ -179,7 +192,7 @@ public abstract class AbstractVirtualHos
try
{
- initialiseStorage(_vhostConfig, _virtualHost);
+ initialiseStorage(_vhostConfig, _model);
getMessageStore().addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_OVERFULL);
getMessageStore().addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL);
@@ -205,6 +218,16 @@ public abstract class AbstractVirtualHos
setState(State.QUIESCED);
}
+ private void registerSystemNodes()
+ {
+ QpidServiceLoader<SystemNodeCreator> qpidServiceLoader = new QpidServiceLoader<SystemNodeCreator>();
+ Iterable<SystemNodeCreator> factories = qpidServiceLoader.instancesOf(SystemNodeCreator.class);
+ for(SystemNodeCreator creator : factories)
+ {
+ creator.register(_systemNodeRegistry);
+ }
+ }
+
abstract protected void initialiseStorage(VirtualHostConfiguration hostConfig,
org.apache.qpid.server.model.VirtualHost virtualHost) throws Exception;
@@ -494,6 +517,13 @@ public abstract class AbstractVirtualHos
}
@Override
+ public MessageSource getMessageSource(final String name)
+ {
+ MessageSource systemSource = _systemNodeSources.get(name);
+ return systemSource == null ? getQueue(name) : systemSource;
+ }
+
+ @Override
public AMQQueue getQueue(UUID id)
{
return _queueRegistry.getQueue(id);
@@ -577,6 +607,14 @@ public abstract class AbstractVirtualHos
}
+
+ @Override
+ public MessageDestination getMessageDestination(final String name)
+ {
+ MessageDestination destination = _systemNodeDestinations.get(name);
+ return destination == null ? getExchange(name) : destination;
+ }
+
@Override
public Exchange getExchange(String name)
{
@@ -690,7 +728,7 @@ public abstract class AbstractVirtualHos
protected void passivate(String reason)
{
- _virtualHost.removeChangeListener(this);
+ _model.removeChangeListener(this);
removeHouseKeepingTasks();
//Stop Connections
@@ -907,7 +945,7 @@ public abstract class AbstractVirtualHos
try
{
initialiseHouseKeeping(_vhostConfig.getHousekeepingCheckPeriod());
- _virtualHost.addChangeListener(this);
+ _model.addChangeListener(this);
finalState = State.ACTIVE;
}
finally
@@ -1059,4 +1097,44 @@ public abstract class AbstractVirtualHos
}
}
+ private class SystemNodeRegistry implements SystemNodeCreator.SystemNodeRegistry
+ {
+ @Override
+ public void registerSystemNode(final MessageNode node)
+ {
+ if(node instanceof MessageDestination)
+ {
+ _systemNodeDestinations.put(node.getName(), (MessageDestination) node);
+ }
+ if(node instanceof MessageSource)
+ {
+ _systemNodeSources.put(node.getName(), (MessageSource)node);
+ }
+ }
+
+ @Override
+ public void removeSystemNode(final MessageNode node)
+ {
+ if(node instanceof MessageDestination)
+ {
+ _systemNodeDestinations.remove(node.getName());
+ }
+ if(node instanceof MessageSource)
+ {
+ _systemNodeSources.remove(node.getName());
+ }
+ }
+
+ @Override
+ public VirtualHost getVirtualHost()
+ {
+ return AbstractVirtualHost.this;
+ }
+
+ @Override
+ public org.apache.qpid.server.model.VirtualHost getVirtualHostModel()
+ {
+ return _model;
+ }
+ }
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java Wed Feb 12 13:27:51 2014
@@ -29,6 +29,9 @@ import org.apache.qpid.common.Closeable;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.connection.IConnectionRegistry;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageNode;
+import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.protocol.LinkRegistry;
import org.apache.qpid.server.queue.AMQQueue;
@@ -48,6 +51,7 @@ public interface VirtualHost extends Dur
String getName();
AMQQueue getQueue(String name);
+ MessageSource getMessageSource(String name);
AMQQueue getQueue(UUID id);
@@ -75,6 +79,8 @@ public interface VirtualHost extends Dur
void removeExchange(Exchange exchange, boolean force) throws AMQException;
+ MessageDestination getMessageDestination(String name);
+
Exchange getExchange(String name);
Exchange getExchange(UUID id);
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java Wed Feb 12 13:27:51 2014
@@ -119,7 +119,7 @@ public class VirtualHostConfigRecoveryHa
}
for(Transaction.Record record : enqueues)
{
- final AMQQueue queue = _virtualHost.getQueue(record.getQueue().getId());
+ final AMQQueue queue = _virtualHost.getQueue(record.getResource().getId());
if(queue != null)
{
final long messageId = record.getMessage().getMessageNumber();
@@ -141,7 +141,7 @@ public class VirtualHostConfigRecoveryHa
try
{
- queue.enqueue(message, true, null);
+ queue.enqueue(message, null);
ref.release();
}
catch (AMQException e)
@@ -173,13 +173,13 @@ public class VirtualHostConfigRecoveryHa
StringBuilder xidString = xidAsString(id);
CurrentActor.get().message(_logSubject,
TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(),
- record.getQueue().getId().toString()));
+ record.getResource().getId().toString()));
}
}
for(Transaction.Record record : dequeues)
{
- final AMQQueue queue = _virtualHost.getQueue(record.getQueue().getId());
+ final AMQQueue queue = _virtualHost.getQueue(record.getResource().getId());
if(queue != null)
{
final long messageId = record.getMessage().getMessageNumber();
@@ -223,7 +223,7 @@ public class VirtualHostConfigRecoveryHa
StringBuilder xidString = xidAsString(id);
CurrentActor.get().message(_logSubject,
TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(),
- record.getQueue().getId().toString()));
+ record.getResource().getId().toString()));
}
}
@@ -292,7 +292,7 @@ public class VirtualHostConfigRecoveryHa
count = 0;
}
- queue.enqueue(message);
+ queue.enqueue(message,null);
_queueRecoveries.put(queueName, ++count);
}
@@ -312,10 +312,22 @@ public class VirtualHostConfigRecoveryHa
new TransactionLogResource()
{
@Override
+ public String getName()
+ {
+ return "<<UNKNOWN>>";
+ }
+
+ @Override
public UUID getId()
{
return queueId;
}
+
+ @Override
+ public boolean isDurable()
+ {
+ return false;
+ }
};
txn.dequeueMessage(mockQueue, new DummyMessage(messageId));
txn.commitTranAsync();
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java Wed Feb 12 13:27:51 2014
@@ -27,7 +27,7 @@ import org.apache.commons.configuration.
import org.apache.commons.configuration.XMLConfiguration;
import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.queue.AMQPriorityQueue;
+import org.apache.qpid.server.queue.PriorityQueue;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.util.BrokerTestHelper;
@@ -113,17 +113,17 @@ public class VirtualHostConfigurationTes
// Check that atest was a priority queue with 5 priorities
AMQQueue atest = vhost.getQueue("atest");
- assertTrue(atest instanceof AMQPriorityQueue);
- assertEquals(5, ((AMQPriorityQueue) atest).getPriorities());
+ assertTrue(atest instanceof PriorityQueue);
+ assertEquals(5, ((PriorityQueue) atest).getPriorities());
// Check that ptest was a priority queue with 10 priorities
AMQQueue ptest = vhost.getQueue("ptest");
- assertTrue(ptest instanceof AMQPriorityQueue);
- assertEquals(10, ((AMQPriorityQueue) ptest).getPriorities());
+ assertTrue(ptest instanceof PriorityQueue);
+ assertEquals(10, ((PriorityQueue) ptest).getPriorities());
// Check that ntest wasn't a priority queue
AMQQueue ntest = vhost.getQueue("ntest");
- assertFalse(ntest instanceof AMQPriorityQueue);
+ assertFalse(ntest instanceof PriorityQueue);
}
public void testQueueAlerts() throws Exception
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java Wed Feb 12 13:27:51 2014
@@ -25,6 +25,7 @@ import junit.framework.Assert;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
@@ -73,7 +74,7 @@ public class TopicExchangeTest extends Q
public void testNoRoute() throws AMQException
{
- AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a*#b", false, null, false, false,
+ AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a*#b", false, null, false, false,
false, null);
_exchange.registerQueue(new Binding(null, "a.*.#.b",queue, _exchange, null));
@@ -85,7 +86,7 @@ public class TopicExchangeTest extends Q
public void testDirectMatch() throws AMQException
{
- AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "ab", false, null, false, false,
+ AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "ab", false, null, false, false,
false, null);
_exchange.registerQueue(new Binding(null, "a.b",queue, _exchange, null));
@@ -108,7 +109,7 @@ public class TopicExchangeTest extends Q
public void testStarMatch() throws AMQException
{
- AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a*", false, null, false, false, false, null);
+ AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a*", false, null, false, false, false, null);
_exchange.registerQueue(new Binding(null, "a.*",queue, _exchange, null));
@@ -139,7 +140,7 @@ public class TopicExchangeTest extends Q
public void testHashMatch() throws AMQException
{
- AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, false, null);
+ AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, false, null);
_exchange.registerQueue(new Binding(null, "a.#",queue, _exchange, null));
@@ -190,7 +191,7 @@ public class TopicExchangeTest extends Q
public void testMidHash() throws AMQException
{
- AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false,
+ AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false,
false, null);
_exchange.registerQueue(new Binding(null, "a.*.#.b",queue, _exchange, null));
@@ -216,7 +217,7 @@ public class TopicExchangeTest extends Q
public void testMatchAfterHash() throws AMQException
{
- AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false,
+ AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false,
false, null);
_exchange.registerQueue(new Binding(null, "a.*.#.b.c",queue, _exchange, null));
@@ -255,7 +256,7 @@ public class TopicExchangeTest extends Q
public void testHashAfterHash() throws AMQException
{
- AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false,
+ AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false,
false, null);
_exchange.registerQueue(new Binding(null, "a.*.#.b.c.#.d",queue, _exchange, null));
@@ -277,7 +278,7 @@ public class TopicExchangeTest extends Q
public void testHashHash() throws AMQException
{
- AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false,
+ AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false,
false, null);
_exchange.registerQueue(new Binding(null, "a.#.*.#.d",queue, _exchange, null));
@@ -321,7 +322,7 @@ public class TopicExchangeTest extends Q
when(message.getMessageNumber()).thenReturn(messageNumber);
for(BaseQueue q : queues)
{
- q.enqueue(message);
+ q.enqueue(message, null);
}
return queues.size();
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/VirtualHostMessagesTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/VirtualHostMessagesTest.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/VirtualHostMessagesTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/VirtualHostMessagesTest.java Wed Feb 12 13:27:51 2014
@@ -38,7 +38,7 @@ public class VirtualHostMessagesTest ext
validateLogMessage(log, "VHT-1001", expected);
}
- public void testSubscriptionClosed()
+ public void testVirtualhostClosed()
{
_logMessage = VirtualHostMessages.CLOSED();
List<Object> log = performLog();
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java Wed Feb 12 13:27:51 2014
@@ -231,7 +231,7 @@ public class AMQQueueFactoryTest extends
false,
attributes);
- assertEquals("Queue not a priority queue", AMQPriorityQueue.class, queue.getClass());
+ assertEquals("Queue not a priority queue", PriorityQueue.class, queue.getClass());
verifyQueueRegistered("testPriorityQueue");
verifyRegisteredQueueCount(1);
}
@@ -246,7 +246,7 @@ public class AMQQueueFactoryTest extends
false,
false,
null);
- assertEquals("Queue not a simple queue", SimpleAMQQueue.class, queue.getClass());
+ assertEquals("Queue not a simple queue", StandardQueue.class, queue.getClass());
verifyQueueRegistered(queueName);
//verify that no alternate exchange or DLQ were produced
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java Wed Feb 12 13:27:51 2014
@@ -28,6 +28,9 @@ import org.apache.qpid.server.message.AM
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import java.util.Collections;
+import java.util.UUID;
+
public class ConflationQueueListTest extends TestCase
{
private static final String CONFLATION_KEY = "CONFLATION_KEY";
@@ -37,13 +40,15 @@ public class ConflationQueueListTest ext
private static final String TEST_KEY_VALUE2 = "testKeyValue2";
private ConflationQueueList _list;
- private AMQQueue _queue = createTestQueue();
+ private ConflationQueue _queue;
@Override
protected void setUp() throws Exception
{
super.setUp();
- _list = new ConflationQueueList(_queue, CONFLATION_KEY);
+ _queue = new ConflationQueue(UUID.randomUUID(), getName(), false, null, false, false, mock(VirtualHost.class),
+ Collections.<String,Object>emptyMap(),CONFLATION_KEY);
+ _list = _queue.getEntries();
}
public void testListHasNoEntries()
@@ -175,7 +180,8 @@ public class ConflationQueueListTest ext
private int countEntries(ConflationQueueList list)
{
- QueueEntryIterator<SimpleQueueEntryImpl> iterator = list.iterator();
+ QueueEntryIterator<ConflationQueueList.ConflationQueueEntry, ConflationQueue, ConflationQueueList,QueueConsumer<?,ConflationQueueList.ConflationQueueEntry, ConflationQueue, ConflationQueueList>> iterator =
+ list.iterator();
int count = 0;
while(iterator.advance())
{
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java Wed Feb 12 13:27:51 2014
@@ -24,15 +24,23 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.configuration.QueueConfiguration;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.security.AuthorizationHolder;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Collection;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
@@ -171,6 +179,8 @@ public class MockAMQQueue implements AMQ
return null;
}
+
+
public boolean isDurable()
{
return false;
@@ -202,32 +212,67 @@ public class MockAMQQueue implements AMQ
return _virtualhost;
}
- public String getName()
+ @Override
+ public boolean resend(final QueueEntry entry, final Consumer consumer) throws AMQException
{
- return _name;
+ return false;
}
- public void registerSubscription(Subscription subscription, boolean exclusive) throws AMQException
+ @Override
+ public void addQueueDeleteTask(final Action task)
+ {
+
+ }
+
+ @Override
+ public void enqueue(final ServerMessage message, final Action action) throws AMQException
{
}
- public void unregisterSubscription(Subscription subscription) throws AMQException
+ @Override
+ public int compareTo(final Object o)
{
+ return 0;
+ }
+
+ @Override
+ public Consumer addConsumer(final ConsumerTarget target,
+ final FilterManager filters,
+ final Class messageClass,
+ final String consumerName,
+ final EnumSet options) throws AMQException
+ {
+ return new QueueConsumer(filters, messageClass, options.contains(Consumer.Option.ACQUIRES),
+ options.contains(Consumer.Option.SEES_REQUEUES), consumerName,
+ options.contains(Consumer.Option.TRANSIENT), target );
+ }
+
+ public String getName()
+ {
+ return _name;
}
- public Collection<Subscription> getConsumers()
+ public int send(final ServerMessage message,
+ final InstanceProperties instanceProperties,
+ final ServerTransaction txn,
+ final Action postEnqueueAction)
+ {
+ return 0;
+ }
+
+ public Collection<QueueConsumer> getConsumers()
{
return Collections.emptyList();
}
- public void addSubscriptionRegistrationListener(final SubscriptionRegistrationListener listener)
+ public void addConsumerRegistrationListener(final ConsumerRegistrationListener listener)
{
}
- public void removeSubscriptionRegistrationListener(final SubscriptionRegistrationListener listener)
+ public void removeConsumerRegistrationListener(final ConsumerRegistrationListener listener)
{
}
@@ -242,7 +287,7 @@ public class MockAMQQueue implements AMQ
return 0;
}
- public boolean hasExclusiveSubscriber()
+ public boolean hasExclusiveConsumer()
{
return false;
}
@@ -293,50 +338,39 @@ public class MockAMQQueue implements AMQ
return getMessageCount();
}
- public void enqueue(ServerMessage message) throws AMQException
- {
- }
-
- public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException
- {
- }
-
-
- public void enqueue(ServerMessage message, boolean sync, PostEnqueueAction action) throws AMQException
- {
- }
public void requeue(QueueEntry entry)
{
}
- public void requeue(QueueEntryImpl storeContext, Subscription subscription)
+ public void dequeue(QueueEntry entry)
{
}
- public void dequeue(QueueEntry entry, Subscription sub)
- {
- }
-
- public boolean resend(QueueEntry entry, Subscription subscription) throws AMQException
+ public boolean resend(QueueEntry entry, QueueConsumer consumer) throws AMQException
{
return false;
}
- public void addQueueDeleteTask(Task task)
+ @Override
+ public void removeQueueDeleteTask(final Action task)
{
+
}
- public void removeQueueDeleteTask(final Task task)
+ @Override
+ public void decrementUnackedMsgCount(final QueueEntry queueEntry)
{
+
}
- public List<QueueEntry> getMessagesOnTheQueue()
+ @Override
+ public List getMessagesOnTheQueue()
{
return null;
}
- public List<QueueEntry> getMessagesOnTheQueue(long fromMessageId, long toMessageId)
+ public List getMessagesOnTheQueue(long fromMessageId, long toMessageId)
{
return null;
}
@@ -356,7 +390,7 @@ public class MockAMQQueue implements AMQ
return null;
}
- public List<QueueEntry> getMessagesRangeOnTheQueue(long fromPosition, long toPosition)
+ public List getMessagesRangeOnTheQueue(long fromPosition, long toPosition)
{
return null;
}
@@ -427,12 +461,12 @@ public class MockAMQQueue implements AMQ
return null;
}
- public void flushSubscription(Subscription sub) throws AMQException
+ public void flushConsumer(Consumer sub) throws AMQException
{
}
- public void deliverAsync(Subscription sub)
+ public void deliverAsync(Consumer sub)
{
}
@@ -563,10 +597,6 @@ public class MockAMQQueue implements AMQ
return 0;
}
- public void decrementUnackedMsgCount(QueueEntry queueEntry)
- {
-
- }
public long getUnackedMessageCount()
{
@@ -610,4 +640,5 @@ public class MockAMQQueue implements AMQ
{
return null;
}
+
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java Wed Feb 12 13:27:51 2014
@@ -26,12 +26,16 @@ import static org.mockito.Mockito.when;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
+import java.util.Collections;
+import java.util.UUID;
+
public class PriorityQueueListTest extends QpidTestCase
{
private static final byte[] PRIORITIES = {4, 5, 5, 4};
- PriorityQueueList _list = new PriorityQueueList(null, 10);
+ PriorityQueueList _list;
private QueueEntry _priority4message1;
private QueueEntry _priority4message2;
@@ -42,6 +46,17 @@ public class PriorityQueueListTest exten
{
QueueEntry[] entries = new QueueEntry[PRIORITIES.length];
+ PriorityQueue queue = new PriorityQueue(UUID.randomUUID(),
+ getName(),
+ false,
+ null,
+ false,
+ false,
+ mock(VirtualHost.class),
+ Collections.<String,Object>emptyMap(),
+ 10);
+ _list = queue.getEntries();
+
for (int i = 0; i < PRIORITIES.length; i++)
{
ServerMessage<?> message = mock(ServerMessage.class);
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java Wed Feb 12 13:27:51 2014
@@ -21,13 +21,19 @@ package org.apache.qpid.server.queue;
import junit.framework.TestCase;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.queue.QueueEntry.EntryState;
-import org.apache.qpid.server.subscription.MockSubscription;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.message.MessageInstance.EntryState;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import java.lang.reflect.Field;
+import java.util.Collections;
+import java.util.UUID;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -57,6 +63,15 @@ public abstract class QueueEntryImplTest
_queueEntry3 = getQueueEntryImpl(3);
}
+
+ protected void mockLogging()
+ {
+ final LogActor logActor = mock(LogActor.class);
+ when(logActor.getRootMessageLogger()).thenReturn(mock(RootMessageLogger.class));
+ CurrentActor.setDefault(logActor);
+ }
+
+
public void testAcquire()
{
assertTrue("Queue entry should be in AVAILABLE state before invoking of acquire method",
@@ -113,11 +128,19 @@ public abstract class QueueEntryImplTest
*/
private void acquire()
{
- _queueEntry.acquire(new MockSubscription());
+ _queueEntry.acquire(newConsumer());
assertTrue("Queue entry should be in ACQUIRED state after invoking of acquire method",
_queueEntry.isAcquired());
}
+ private QueueConsumer newConsumer()
+ {
+ final ConsumerTarget target = mock(ConsumerTarget.class);
+ when(target.getSessionModel()).thenReturn(mock(AMQSessionModel.class));
+ final QueueConsumer consumer = new QueueConsumer(null,null,true,true,"mock",false,target);
+ return consumer;
+ }
+
/**
* A helper method to get entry state
*
@@ -140,36 +163,34 @@ public abstract class QueueEntryImplTest
}
/**
- * Tests rejecting a queue entry records the Subscription ID
- * for later verification by isRejectedBy(subscriptionId).
+ * Tests rejecting a queue entry records the Consumer ID
+ * for later verification by isRejectedBy(consumerId).
*/
public void testRejectAndRejectedBy()
{
- Subscription sub = new MockSubscription();
- long subId = sub.getSubscriptionID();
+ QueueConsumer sub = newConsumer();
- assertFalse("Queue entry should not yet have been rejected by the subscription", _queueEntry.isRejectedBy(subId));
- assertFalse("Queue entry should not yet have been acquired by a subscription", _queueEntry.isAcquired());
+ assertFalse("Queue entry should not yet have been rejected by the consumer", _queueEntry.isRejectedBy(sub));
+ assertFalse("Queue entry should not yet have been acquired by a consumer", _queueEntry.isAcquired());
- //acquire, reject, and release the message using the subscription
+ //acquire, reject, and release the message using the consumer
assertTrue("Queue entry should have been able to be acquired", _queueEntry.acquire(sub));
_queueEntry.reject();
_queueEntry.release();
//verify the rejection is recorded
- assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(subId));
+ assertTrue("Queue entry should have been rejected by the consumer", _queueEntry.isRejectedBy(sub));
- //repeat rejection using a second subscription
- Subscription sub2 = new MockSubscription();
- long sub2Id = sub2.getSubscriptionID();
+ //repeat rejection using a second consumer
+ QueueConsumer sub2 = newConsumer();
- assertFalse("Queue entry should not yet have been rejected by the subscription", _queueEntry.isRejectedBy(sub2Id));
+ assertFalse("Queue entry should not yet have been rejected by the consumer", _queueEntry.isRejectedBy(sub2));
assertTrue("Queue entry should have been able to be acquired", _queueEntry.acquire(sub2));
_queueEntry.reject();
- //verify it still records being rejected by both subscriptions
- assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(subId));
- assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(sub2Id));
+ //verify it still records being rejected by both consumers
+ assertTrue("Queue entry should have been rejected by the consumer", _queueEntry.isRejectedBy(sub));
+ assertTrue("Queue entry should have been rejected by the consumer", _queueEntry.isRejectedBy(sub2));
}
/**
@@ -179,7 +200,9 @@ public abstract class QueueEntryImplTest
{
int numberOfEntries = 5;
QueueEntryImpl[] entries = new QueueEntryImpl[numberOfEntries];
- SimpleQueueEntryList queueEntryList = new SimpleQueueEntryList(new MockAMQQueue("test"));
+ StandardQueue queue = new StandardQueue(UUID.randomUUID(), getName(), false, null, false, false,
+ mock(VirtualHost.class), Collections.<String,Object>emptyMap());
+ OrderedQueueEntryList queueEntryList = queue.getEntries();
// create test entries
for(int i = 0; i < numberOfEntries ; i++)
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java Wed Feb 12 13:27:51 2014
@@ -25,6 +25,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.consumer.Consumer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -32,20 +33,21 @@ import static org.mockito.Mockito.when;
/**
* Abstract test class for QueueEntryList implementations.
*/
-public abstract class QueueEntryListTestBase extends TestCase
+public abstract class QueueEntryListTestBase<E extends QueueEntry<E,Q,C>, Q extends AMQQueue<E,Q,C>, L extends QueueEntryList<E,Q,L,C>, C extends Consumer> extends TestCase
{
- protected static final AMQQueue _testQueue = new MockAMQQueue("test");
- public abstract QueueEntryList<QueueEntry> getTestList();
- public abstract QueueEntryList<QueueEntry> getTestList(boolean newList);
+ public abstract L getTestList();
+ public abstract L getTestList(boolean newList);
public abstract long getExpectedFirstMsgId();
public abstract int getExpectedListLength();
public abstract ServerMessage getTestMessageToAdd() throws AMQException;
public void testGetQueue()
{
- assertEquals("Unexpected head entry returned by getHead()", getTestList().getQueue(), _testQueue);
+ assertEquals("Unexpected head entry returned by getHead()", getTestList().getQueue(), getTestQueue());
}
+ protected abstract Q getTestQueue();
+
/**
* Test to add a message with properties specific to the queue type.
* @see QueueEntryListTestBase#getTestList()
@@ -54,10 +56,10 @@ public abstract class QueueEntryListTest
*/
public void testAddSpecificMessage() throws AMQException
{
- final QueueEntryList<QueueEntry> list = getTestList();
+ final L list = getTestList();
list.add(getTestMessageToAdd());
- final QueueEntryIterator<?> iter = list.iterator();
+ final QueueEntryIterator<E,Q,L,C> iter = list.iterator();
int count = 0;
while(iter.advance())
{
@@ -75,11 +77,11 @@ public abstract class QueueEntryListTest
*/
public void testAddGenericMessage() throws AMQException
{
- final QueueEntryList<QueueEntry> list = getTestList();
+ final L list = getTestList();
final ServerMessage message = createServerMessage(666l);
list.add(message);
- final QueueEntryIterator<?> iter = list.iterator();
+ final QueueEntryIterator<E,Q,L,C> iter = list.iterator();
int count = 0;
while(iter.advance())
{
@@ -109,8 +111,8 @@ public abstract class QueueEntryListTest
*/
public void testListNext()
{
- final QueueEntryList<QueueEntry> entryList = getTestList();
- QueueEntry entry = entryList.getHead();
+ final L entryList = getTestList();
+ E entry = entryList.getHead();
int count = 0;
while(entryList.next(entry) != null)
{
@@ -127,7 +129,7 @@ public abstract class QueueEntryListTest
*/
public void testIterator()
{
- final QueueEntryIterator<?> iter = getTestList().iterator();
+ final QueueEntryIterator<E,Q,L,C> iter = getTestList().iterator();
int count = 0;
while(iter.advance())
{
@@ -145,10 +147,10 @@ public abstract class QueueEntryListTest
public void testDequeuedMessagedNotPresentInIterator() throws Exception
{
final int numberOfMessages = getExpectedListLength();
- final QueueEntryList<QueueEntry> entryList = getTestList();
+ final L entryList = getTestList();
// dequeue all even messages
- final QueueEntryIterator<?> it1 = entryList.iterator();
+ final QueueEntryIterator<E,Q,L,C> it1 = entryList.iterator();
int counter = 0;
while (it1.advance())
{
@@ -161,7 +163,7 @@ public abstract class QueueEntryListTest
}
// iterate and check that dequeued messages are not returned by iterator
- final QueueEntryIterator<?> it2 = entryList.iterator();
+ final QueueEntryIterator<E,Q,L,C> it2 = entryList.iterator();
int counter2 = 0;
while(it2.advance())
{
@@ -180,7 +182,7 @@ public abstract class QueueEntryListTest
*/
public void testGetHead()
{
- final QueueEntry head = getTestList().getHead();
+ final E head = getTestList().getHead();
assertNull("Head entry should not contain an actual message", head.getMessage());
assertEquals("Unexpected message id for first list entry", getExpectedFirstMsgId(), getTestList().next(head)
.getMessage().getMessageNumber());
@@ -192,16 +194,16 @@ public abstract class QueueEntryListTest
*/
public void testEntryDeleted()
{
- final QueueEntry head = getTestList().getHead();
+ final E head = getTestList().getHead();
- final QueueEntry first = getTestList().next(head);
+ final E first = getTestList().next(head);
first.delete();
- final QueueEntry second = getTestList().next(head);
+ final E second = getTestList().next(head);
assertNotSame("After deletion the next entry should be different", first.getMessage().getMessageNumber(), second
.getMessage().getMessageNumber());
- final QueueEntry third = getTestList().next(first);
+ final E third = getTestList().next(first);
assertEquals("After deletion the deleted nodes next node should be the same as the next from head", second
.getMessage().getMessageNumber(), third.getMessage().getMessageNumber());
}
@@ -215,11 +217,11 @@ public abstract class QueueEntryListTest
*/
public void testIteratorIgnoresDeletedFinalNode() throws Exception
{
- QueueEntryList<QueueEntry> list = getTestList(true);
+ L list = getTestList(true);
int i = 0;
- QueueEntry queueEntry1 = list.add(createServerMessage(i++));
- QueueEntry queueEntry2 = list.add(createServerMessage(i++));
+ E queueEntry1 = list.add(createServerMessage(i++));
+ E queueEntry2 = list.add(createServerMessage(i++));
assertSame(queueEntry2, list.next(queueEntry1));
assertNull(list.next(queueEntry2));
@@ -228,7 +230,7 @@ public abstract class QueueEntryListTest
queueEntry2.delete();
assertTrue("Deleting node should have succeeded", queueEntry2.isDeleted());
- QueueEntryIterator<QueueEntry> iter = list.iterator();
+ QueueEntryIterator<E,Q,L,C> iter = list.iterator();
//verify the iterator isn't 'atTail', can advance, and returns the 1st QueueEntry
assertFalse("Iterator should not have been 'atTail'", iter.atTail());
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java Wed Feb 12 13:27:51 2014
@@ -22,7 +22,7 @@ package org.apache.qpid.server.queue;
import junit.framework.Assert;
import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.queue.SortedQueueEntryImpl.Colour;
+import org.apache.qpid.server.queue.SortedQueueEntry.Colour;
/**
* Test extension of SortedQueueEntryList that provides data structure validation tests.
@@ -30,22 +30,28 @@ import org.apache.qpid.server.queue.Sort
*/
public class SelfValidatingSortedQueueEntryList extends SortedQueueEntryList
{
- public SelfValidatingSortedQueueEntryList(AMQQueue queue, String propertyName)
+ public SelfValidatingSortedQueueEntryList(SortedQueue queue, String propertyName)
{
super(queue, propertyName);
}
+ @Override
+ public SortedQueue getQueue()
+ {
+ return super.getQueue();
+ }
+
@Override /** Overridden to automatically check queue properties before and after. */
- public SortedQueueEntryImpl add(final ServerMessage message)
+ public SortedQueueEntry add(final ServerMessage message)
{
assertQueueProperties(); //before add
- final SortedQueueEntryImpl result = super.add(message);
+ final SortedQueueEntry result = super.add(message);
assertQueueProperties(); //after add
return result;
}
@Override /** Overridden to automatically check queue properties before and after. */
- public void entryDeleted(SortedQueueEntryImpl entry)
+ public void entryDeleted(SortedQueueEntry entry)
{
assertQueueProperties(); //before delete
super.entryDeleted(entry);
@@ -73,7 +79,7 @@ public class SelfValidatingSortedQueueEn
assertTreeIntegrity(getRoot());
}
- public void assertTreeIntegrity(final SortedQueueEntryImpl node)
+ public void assertTreeIntegrity(final SortedQueueEntry node)
{
if(node == null)
{
@@ -109,7 +115,7 @@ public class SelfValidatingSortedQueueEn
assertLeavesSameBlackPath(getRoot());
}
- public int assertLeavesSameBlackPath(final SortedQueueEntryImpl node)
+ public int assertLeavesSameBlackPath(final SortedQueueEntry node)
{
if(node == null)
{
@@ -133,7 +139,7 @@ public class SelfValidatingSortedQueueEn
assertChildrenOfRedAreBlack(getRoot());
}
- public void assertChildrenOfRedAreBlack(final SortedQueueEntryImpl node)
+ public void assertChildrenOfRedAreBlack(final SortedQueueEntry node)
{
if(node == null)
{
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java Wed Feb 12 13:27:51 2014
@@ -21,8 +21,14 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import java.util.UUID;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -30,7 +36,20 @@ import static org.mockito.Mockito.when;
public class SimpleQueueEntryImplTest extends QueueEntryImplTestBase
{
- private SimpleQueueEntryList queueEntryList = new SimpleQueueEntryList(new MockAMQQueue("test"));
+ private OrderedQueueEntryList queueEntryList;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ mockLogging();
+
+ StandardQueue queue = new StandardQueue(UUID.randomUUID(), "SimpleQueueEntryImplTest", false, null,false, false, mock(VirtualHost.class),null);
+
+ queueEntryList = queue.getEntries();
+
+ super.setUp();
+ }
+
public QueueEntryImpl getQueueEntryImpl(int msgId) throws AMQException
{
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java Wed Feb 12 13:27:51 2014
@@ -21,20 +21,26 @@ package org.apache.qpid.server.queue;
import java.util.Collections;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Arrays;
+import java.util.UUID;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-public class SortedQueueEntryListTest extends QueueEntryListTestBase
+public class SortedQueueEntryListTest extends QueueEntryListTestBase<SortedQueueEntry, SortedQueue, SortedQueueEntryList, QueueConsumer<?,SortedQueueEntry, SortedQueue, SortedQueueEntryList>>
{
private static SelfValidatingSortedQueueEntryList _sqel;
+
public final static String keys[] = { " 73", " 18", " 11", "127", "166", "163", " 69", " 60", "191", "144",
" 17", "161", "145", "140", "157", " 47", "136", " 56", "176", " 81",
"195", " 96", " 2", " 68", "101", "141", "159", "187", "149", " 45",
@@ -62,16 +68,30 @@ public class SortedQueueEntryListTest ex
private final static String keysSorted[] = keys.clone();
+ private SortedQueue _testQueue;
+
@Override
protected void setUp() throws Exception
{
+ mockLogging();
+
+ // Create test list
+ _testQueue = new SortedQueue(UUID.randomUUID(), getName(), false, null, false,false, mock(VirtualHost.class), null, "KEY", new QueueEntryListFactory<SortedQueueEntry,SortedQueue,SortedQueueEntryList>()
+ {
+
+ @Override
+ public SortedQueueEntryList createQueueEntryList(final SortedQueue queue)
+ {
+ return new SelfValidatingSortedQueueEntryList(queue, "KEY");
+ }
+ });
+ _sqel = (SelfValidatingSortedQueueEntryList) _testQueue.getEntries();
+
super.setUp();
// Create result array
Arrays.sort(keysSorted);
- // Create test list
- _sqel = new SelfValidatingSortedQueueEntryList(_testQueue, "KEY");
// Build test list
long messageId = 0L;
@@ -83,14 +103,22 @@ public class SortedQueueEntryListTest ex
}
+ protected void mockLogging()
+ {
+ final LogActor logActor = mock(LogActor.class);
+ when(logActor.getRootMessageLogger()).thenReturn(mock(RootMessageLogger.class));
+ CurrentActor.setDefault(logActor);
+ }
+
+
@Override
- public QueueEntryList getTestList()
+ public SortedQueueEntryList getTestList()
{
return getTestList(false);
}
@Override
- public QueueEntryList getTestList(boolean newList)
+ public SortedQueueEntryList getTestList(boolean newList)
{
if(newList)
{
@@ -117,6 +145,12 @@ public class SortedQueueEntryListTest ex
return generateTestMessage(1, "test value");
}
+ @Override
+ protected SortedQueue getTestQueue()
+ {
+ return _testQueue;
+ }
+
private ServerMessage generateTestMessage(final long id, final String keyValue) throws AMQException
{
final ServerMessage message = mock(ServerMessage.class);
@@ -138,7 +172,7 @@ public class SortedQueueEntryListTest ex
super.testIterator();
// Test sorted order of list
- final QueueEntryIterator<?> iter = getTestList().iterator();
+ final QueueEntryIterator<SortedQueueEntry, SortedQueue, SortedQueueEntryList, QueueConsumer<?,SortedQueueEntry, SortedQueue, SortedQueueEntryList>> iter = getTestList().iterator();
int count = 0;
while(iter.advance())
{
@@ -147,12 +181,12 @@ public class SortedQueueEntryListTest ex
}
}
- private Object getSortedKeyValue(QueueEntryIterator<?> iter)
+ private Object getSortedKeyValue(QueueEntryIterator<SortedQueueEntry, SortedQueue, SortedQueueEntryList, QueueConsumer<?,SortedQueueEntry, SortedQueue, SortedQueueEntryList>> iter)
{
return (iter.getNode()).getMessage().getMessageHeader().getHeader("KEY");
}
- private Long getMessageId(QueueEntryIterator<?> iter)
+ private Long getMessageId(QueueEntryIterator<SortedQueueEntry, SortedQueue, SortedQueueEntryList, QueueConsumer<?,SortedQueueEntry, SortedQueue, SortedQueueEntryList>> iter)
{
return (iter.getNode()).getMessage().getMessageNumber();
}
@@ -169,7 +203,7 @@ public class SortedQueueEntryListTest ex
_sqel.add(msg);
}
- final QueueEntryIterator<?> iter = getTestList().iterator();
+ final QueueEntryIterator<SortedQueueEntry, SortedQueue, SortedQueueEntryList, QueueConsumer<?,SortedQueueEntry, SortedQueue, SortedQueueEntryList>> iter = getTestList().iterator();
int count=0;
while(iter.advance())
{
@@ -190,12 +224,13 @@ public class SortedQueueEntryListTest ex
_sqel.add(msg);
}
- final QueueEntryIterator<?> iter = getTestList().iterator();
+ final QueueEntryIterator<SortedQueueEntry, SortedQueue, SortedQueueEntryList, QueueConsumer<?,SortedQueueEntry, SortedQueue, SortedQueueEntryList>> iter = getTestList().iterator();
int count=0;
while(iter.advance())
{
assertNull("Sorted queue entry value is not as expected", getSortedKeyValue(iter));
- assertEquals("Message id not as expected", Long.valueOf(count++), getMessageId(iter)); }
+ assertEquals("Message id not as expected", Long.valueOf(count++), getMessageId(iter));
+ }
}
public void testAscendingSortKeys() throws Exception
@@ -211,7 +246,7 @@ public class SortedQueueEntryListTest ex
_sqel.add(msg);
}
- final QueueEntryIterator<?> iter = getTestList().iterator();
+ final QueueEntryIterator<SortedQueueEntry, SortedQueue, SortedQueueEntryList, QueueConsumer<?,SortedQueueEntry, SortedQueue, SortedQueueEntryList>> iter = getTestList().iterator();
int count=0;
while(iter.advance())
{
@@ -234,7 +269,7 @@ public class SortedQueueEntryListTest ex
_sqel.add(msg);
}
- final QueueEntryIterator<?> iter = getTestList().iterator();
+ final QueueEntryIterator<SortedQueueEntry, SortedQueue, SortedQueueEntryList, QueueConsumer<?,SortedQueueEntry, SortedQueue, SortedQueueEntryList>> iter = getTestList().iterator();
int count=0;
while(iter.advance())
{
@@ -251,7 +286,7 @@ public class SortedQueueEntryListTest ex
ServerMessage msg = generateTestMessage(1, "A");
_sqel.add(msg);
- SortedQueueEntryImpl entry = _sqel.next(_sqel.getHead());
+ SortedQueueEntry entry = _sqel.next(_sqel.getHead());
validateEntry(entry, "A", 1);
msg = generateTestMessage(2, "B");
@@ -271,7 +306,7 @@ public class SortedQueueEntryListTest ex
ServerMessage msg = generateTestMessage(1, "B");
_sqel.add(msg);
- SortedQueueEntryImpl entry = _sqel.next(_sqel.getHead());
+ SortedQueueEntry entry = _sqel.next(_sqel.getHead());
validateEntry(entry, "B", 1);
msg = generateTestMessage(2, "A");
@@ -290,7 +325,7 @@ public class SortedQueueEntryListTest ex
ServerMessage msg = generateTestMessage(1, "A");
_sqel.add(msg);
- SortedQueueEntryImpl entry = _sqel.next(_sqel.getHead());
+ SortedQueueEntry entry = _sqel.next(_sqel.getHead());
validateEntry(entry, "A", 1);
msg = generateTestMessage(2, "C");
@@ -322,7 +357,7 @@ public class SortedQueueEntryListTest ex
ServerMessage msg = generateTestMessage(1, "B");
_sqel.add(msg);
- SortedQueueEntryImpl entry = _sqel.next(_sqel.getHead());
+ SortedQueueEntry entry = _sqel.next(_sqel.getHead());
validateEntry(entry, "B", 1);
msg = generateTestMessage(2, "D");
@@ -362,7 +397,7 @@ public class SortedQueueEntryListTest ex
validateEntry(entry, "D", 2);
}
- private void validateEntry(final SortedQueueEntryImpl entry, final String expectedSortKey, final long expectedMessageId)
+ private void validateEntry(final SortedQueueEntry entry, final String expectedSortKey, final long expectedMessageId)
{
assertEquals("Sorted queue entry value is not as expected",
expectedSortKey, entry.getMessage().getMessageHeader().getHeader("KEY"));
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java Wed Feb 12 13:27:51 2014
@@ -464,7 +464,7 @@ public abstract class AbstractDurableCon
}
@Override
- public TransactionLogResource getQueue()
+ public TransactionLogResource getResource()
{
return _queue;
}
@@ -505,7 +505,7 @@ public abstract class AbstractDurableCon
{
return false;
}
- if (_queue == null && other.getQueue() != null)
+ if (_queue == null && other.getResource() != null)
{
return false;
}
@@ -513,7 +513,7 @@ public abstract class AbstractDurableCon
{
return false;
}
- return _queue.getId().equals(other.getQueue().getId());
+ return _queue.getId().equals(other.getResource().getId());
}
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java Wed Feb 12 13:27:51 2014
@@ -158,6 +158,12 @@ public abstract class MessageStoreQuotaE
return _transactionResource;
}
+ @Override
+ public boolean isDurable()
+ {
+ return true;
+ }
+
private static class TestMessage implements EnqueueableMessage
{
private final StoredMessage<?> _handle;
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java Wed Feb 12 13:27:51 2014
@@ -26,7 +26,6 @@ import java.nio.ByteBuffer;
import org.apache.qpid.framing.EncodingUtils;
import org.apache.qpid.server.plugin.MessageMetaDataType;
-import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.util.ByteBufferOutputStream;
public class TestMessageMetaData implements StorableMessageMetaData
@@ -72,7 +71,7 @@ public class TestMessageMetaData impleme
}
@Override
- public int writeToBuffer(int offsetInMetaData, ByteBuffer dest)
+ public int writeToBuffer(ByteBuffer dest)
{
int oldPosition = dest.position();
try
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java Wed Feb 12 13:27:51 2014
@@ -20,12 +20,13 @@
*/
package org.apache.qpid.server.txn;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MockAMQQueue;
-import org.apache.qpid.server.queue.MockQueueEntry;
-import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.queue.MockMessageInstance;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.txn.MockStoreTransaction.TransactionState;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -47,7 +48,7 @@ public class AutoCommitTransactionTest e
private MessageStore _transactionLog;
private AMQQueue _queue;
private List<AMQQueue> _queues;
- private Collection<QueueEntry> _queueEntries;
+ private Collection<MessageInstance> _queueEntries;
private ServerMessage _message;
private MockAction _action;
private MockStoreTransaction _storeTransaction;
@@ -373,9 +374,9 @@ public class AutoCommitTransactionTest e
assertFalse("Rollback action must be fired", _action.isRollbackActionFired());
}
- private Collection<QueueEntry> createTestQueueEntries(boolean[] queueDurableFlags, boolean[] messagePersistentFlags)
+ private Collection<MessageInstance> createTestQueueEntries(boolean[] queueDurableFlags, boolean[] messagePersistentFlags)
{
- Collection<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
+ Collection<MessageInstance> queueEntries = new ArrayList<MessageInstance>();
assertTrue("Boolean arrays must be the same length", queueDurableFlags.length == messagePersistentFlags.length);
@@ -384,7 +385,7 @@ public class AutoCommitTransactionTest e
final AMQQueue queue = createTestAMQQueue(queueDurableFlags[i]);
final ServerMessage message = createTestMessage(messagePersistentFlags[i]);
- queueEntries.add(new MockQueueEntry()
+ queueEntries.add(new MockMessageInstance()
{
@Override
@@ -394,7 +395,7 @@ public class AutoCommitTransactionTest e
}
@Override
- public AMQQueue getQueue()
+ public TransactionLogResource getOwningResource()
{
return queue;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org