You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2012/03/01 16:42:53 UTC
svn commit: r1295627 [6/12] - in
/qpid/branches/rg-amqp-1-0-sandbox/qpid/java: ./ bdbstore/ bdbstore/src/main/
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/
bdbsto...
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java Thu Mar 1 15:42:44 2012
@@ -1,12 +1,14 @@
package org.apache.qpid.server.queue;
import java.util.Map;
+
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionList;
import org.apache.qpid.server.virtualhost.VirtualHost;
public abstract class OutOfOrderQueue extends SimpleAMQQueue
{
+
protected OutOfOrderQueue(String name, boolean durable, String owner,
boolean autoDelete, boolean exclusive, VirtualHost virtualHost,
QueueEntryListFactory entryListFactory, Map<String, Object> arguments)
@@ -27,11 +29,8 @@ public abstract class OutOfOrderQueue ex
QueueContext context = (QueueContext) subscription.getQueueContext();
if(context != null)
{
- QueueEntry subnode = context._lastSeenEntry;
QueueEntry released = context._releasedEntry;
-
- while(subnode != null && entry.compareTo(subnode) < 0 && !entry.isAcquired()
- && (released == null || released.compareTo(entry) > 0))
+ while(!entry.isAcquired() && (released == null || released.compareTo(entry) > 0))
{
if(QueueContext._releasedUpdater.compareAndSet(context,released,entry))
{
@@ -39,14 +38,11 @@ public abstract class OutOfOrderQueue ex
}
else
{
- subnode = context._lastSeenEntry;
released = context._releasedEntry;
}
-
}
}
}
-
}
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Thu Mar 1 15:42:44 2012
@@ -75,6 +75,11 @@ public interface QueueEntry extends Comp
{
return State.AVAILABLE;
}
+
+ public String toString()
+ {
+ return getState().name();
+ }
}
@@ -85,6 +90,11 @@ public interface QueueEntry extends Comp
{
return State.DEQUEUED;
}
+
+ public String toString()
+ {
+ return getState().name();
+ }
}
@@ -95,6 +105,11 @@ public interface QueueEntry extends Comp
{
return State.DELETED;
}
+
+ public String toString()
+ {
+ return getState().name();
+ }
}
public final class ExpiredState extends EntryState
@@ -104,6 +119,11 @@ public interface QueueEntry extends Comp
{
return State.EXPIRED;
}
+
+ public String toString()
+ {
+ return getState().name();
+ }
}
@@ -113,6 +133,11 @@ public interface QueueEntry extends Comp
{
return State.ACQUIRED;
}
+
+ public String toString()
+ {
+ return getState().name();
+ }
}
public final class SubscriptionAcquiredState extends EntryState
@@ -134,6 +159,11 @@ public interface QueueEntry extends Comp
{
return _subscription;
}
+
+ public String toString()
+ {
+ return "{" + getState().name() + " : " + _subscription +"}";
+ }
}
public final class SubscriptionAssignedState extends EntryState
@@ -155,6 +185,12 @@ public interface QueueEntry extends Comp
{
return _subscription;
}
+
+
+ public String toString()
+ {
+ return "{" + getState().name() + " : " + _subscription +"}";
+ }
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Thu Mar 1 15:42:44 2012
@@ -35,6 +35,7 @@ import org.apache.qpid.server.message.Me
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.AutoCommitTransaction;
+import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
public abstract class QueueEntryImpl implements QueueEntry
@@ -420,7 +421,7 @@ public abstract class QueueEntryImpl imp
if (rerouteQueues != null && rerouteQueues.size() != 0)
{
- ServerTransaction txn = new AutoCommitTransaction(getQueue().getVirtualHost().getTransactionLog());
+ ServerTransaction txn = new LocalTransaction(getQueue().getVirtualHost().getMessageStore());
txn.enqueue(rerouteQueues, message, new ServerTransaction.Action()
{
@@ -443,7 +444,8 @@ public abstract class QueueEntryImpl imp
{
}
- });
+ }, 0L);
+
txn.dequeue(currentQueue, message, new ServerTransaction.Action()
{
public void postCommit()
@@ -456,8 +458,10 @@ public abstract class QueueEntryImpl imp
}
});
- }
+
+ txn.commit();
}
+ }
}
public boolean isQueueDeleted()
@@ -545,4 +549,11 @@ public abstract class QueueEntryImpl imp
_deliveryCountUpdater.decrementAndGet(this);
}
+ public String toString()
+ {
+ return "QueueEntryImpl{" +
+ "_entryId=" + _entryId +
+ ", _state=" + _state +
+ '}';
+ }
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java Thu Mar 1 15:42:44 2012
@@ -35,4 +35,6 @@ public interface QueueEntryList<Q extend
Q getHead();
void entryDeleted(Q queueEntry);
+
+ int getPriorities();
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java Thu Mar 1 15:42:44 2012
@@ -22,7 +22,6 @@ package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.pool.ReadWriteRunnable;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.queue.QueueRunner;
import org.apache.qpid.server.queue.SimpleAMQQueue;
@@ -38,7 +37,7 @@ import java.util.concurrent.atomic.Atomi
* when straight-through delivery of a message to a subscription isn't
* possible during the enqueue operation.
*/
-public class QueueRunner implements ReadWriteRunnable
+public class QueueRunner implements Runnable
{
private static final Logger _logger = Logger.getLogger(QueueRunner.class);
@@ -51,13 +50,11 @@ public class QueueRunner implements Read
private final AtomicInteger _scheduled = new AtomicInteger(IDLE);
- private static final long ITERATIONS = SimpleAMQQueue.MAX_ASYNC_DELIVERIES;
private final AtomicBoolean _stateChange = new AtomicBoolean();
private final AtomicLong _lastRunAgain = new AtomicLong();
private final AtomicLong _lastRunTime = new AtomicLong();
- private long _runs;
private long _continues;
public QueueRunner(SimpleAMQQueue queue)
@@ -65,8 +62,6 @@ public class QueueRunner implements Read
_queue = queue;
}
- private int trouble = 0;
-
public void run()
{
if(_scheduled.compareAndSet(SCHEDULED,RUNNING))
@@ -103,16 +98,6 @@ public class QueueRunner implements Read
}
}
- public boolean isRead()
- {
- return false;
- }
-
- public boolean isWrite()
- {
- return true;
- }
-
public String toString()
{
return "QueueRunner-" + _queue.getLogActor();
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Thu Mar 1 15:42:44 2012
@@ -18,22 +18,33 @@
*/
package org.apache.qpid.server.queue;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.management.JMException;
import org.apache.log4j.Logger;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.pool.ReadWriteRunnable;
import org.apache.qpid.pool.ReferenceCountingExecutorService;
-import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
-import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.configuration.ConfigStore;
import org.apache.qpid.server.configuration.ConfiguredObject;
import org.apache.qpid.server.configuration.QueueConfigType;
import org.apache.qpid.server.configuration.QueueConfiguration;
import org.apache.qpid.server.configuration.SessionConfig;
+import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
@@ -43,8 +54,12 @@ import org.apache.qpid.server.logging.me
import org.apache.qpid.server.logging.subjects.QueueLogSubject;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.security.AuthorizationHolder;
+import org.apache.qpid.server.subscription.AssignedSubscriptionMessageGroupManager;
+import org.apache.qpid.server.subscription.DefinedGroupMessageGroupManager;
+import org.apache.qpid.server.subscription.MessageGroupManager;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionList;
import org.apache.qpid.server.txn.AutoCommitTransaction;
@@ -52,27 +67,15 @@ import org.apache.qpid.server.txn.LocalT
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import javax.management.JMException;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-
-public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
+public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, MessageGroupManager.SubscriptionResetHelper
{
private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class);
+ private static final String QPID_GROUP_HEADER_KEY = "qpid.group_header_key";
+ private static final String QPID_SHARED_MSG_GROUP = "qpid.shared_msg_group";
+ private static final String QPID_DEFAULT_MESSAGE_GROUP = "qpid.default-message-group";
+ private static final String QPID_NO_GROUP = "qpid.no-group";
+ // TODO - should make this configurable at the vhost / broker level
+ private static final int DEFAULT_MAX_GROUPS = 255;
private final VirtualHost _virtualHost;
@@ -164,7 +167,7 @@ public class SimpleAMQQueue implements A
private AtomicInteger _deliveredMessages = new AtomicInteger();
private AtomicBoolean _stopped = new AtomicBoolean(false);
- private final ConcurrentMap<AMQChannel, Boolean> _blockedChannels = new ConcurrentHashMap<AMQChannel, Boolean>();
+ private final ConcurrentMap<AMQSessionModel, Boolean> _blockedChannels = new ConcurrentHashMap<AMQSessionModel, Boolean>();
private final AtomicBoolean _deleted = new AtomicBoolean(false);
private final List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>();
@@ -190,6 +193,7 @@ public class SimpleAMQQueue implements A
/** the maximum delivery count for each message on this queue or 0 if maximum delivery count is not to be enforced. */
private int _maximumDeliveryCount = ApplicationRegistry.getInstance().getConfiguration().getMaxDeliveryCount();
+ private final MessageGroupManager _messageGroupManager;
protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String,Object> arguments)
{
@@ -245,25 +249,15 @@ public class SimpleAMQQueue implements A
_logSubject = new QueueLogSubject(this);
_logActor = new QueueActor(this, CurrentActor.get().getRootMessageLogger());
- // Log the correct creation message
-
- // Extract the number of priorities for this Queue.
- // Leave it as 0 if we are a SimpleQueueEntryList
- int priorities = 0;
- if (entryListFactory instanceof PriorityQueueList.Factory)
- {
- priorities = ((PriorityQueueList)_entries).getPriorities();
- }
-
// Log the creation of this Queue.
// The priorities display is toggled on if we set priorities > 0
CurrentActor.get().message(_logSubject,
QueueMessages.CREATED(String.valueOf(_owner),
- priorities,
- _owner != null,
- autoDelete,
- durable, !durable,
- priorities > 0));
+ _entries.getPriorities(),
+ _owner != null,
+ autoDelete,
+ durable, !durable,
+ _entries.getPriorities() > 0));
getConfigStore().addConfiguredObject(this);
@@ -277,6 +271,26 @@ public class SimpleAMQQueue implements A
_logger.error("AMQQueue MBean creation has failed ", e);
}
+ if(arguments != null && arguments.containsKey(QPID_GROUP_HEADER_KEY))
+ {
+ if(arguments.containsKey(QPID_SHARED_MSG_GROUP) && String.valueOf(arguments.get(QPID_SHARED_MSG_GROUP)).equals("1"))
+ {
+ String defaultGroup = String.valueOf(arguments.get(QPID_DEFAULT_MESSAGE_GROUP));
+ _messageGroupManager =
+ new DefinedGroupMessageGroupManager(String.valueOf(arguments.get(QPID_GROUP_HEADER_KEY)),
+ defaultGroup == null ? QPID_NO_GROUP : defaultGroup,
+ this);
+ }
+ else
+ {
+ _messageGroupManager = new AssignedSubscriptionMessageGroupManager(String.valueOf(arguments.get(QPID_GROUP_HEADER_KEY)), DEFAULT_MAX_GROUPS);
+ }
+ }
+ else
+ {
+ _messageGroupManager = null;
+ }
+
resetNotifications();
}
@@ -292,7 +306,7 @@ public class SimpleAMQQueue implements A
// ------ Getters and Setters
- public void execute(ReadWriteRunnable runnable)
+ public void execute(Runnable runnable)
{
_asyncDelivery.execute(runnable);
}
@@ -491,6 +505,11 @@ public class SimpleAMQQueue implements A
setExclusiveSubscriber(null);
subscription.setQueueContext(null);
+ if(_messageGroupManager != null)
+ {
+ resetSubPointersForGroups(subscription, true);
+ }
+
// auto-delete queues must be deleted if there are no remaining subscribers
if (_autoDelete && getDeleteOnNoConsumers() && !subscription.isTransient() && getConsumerCount() == 0 )
@@ -515,6 +534,34 @@ public class SimpleAMQQueue implements A
}
+ public void resetSubPointersForGroups(Subscription subscription, boolean clearAssignments)
+ {
+ QueueEntry entry = _messageGroupManager.findEarliestAssignedAvailableEntry(subscription);
+ if(clearAssignments)
+ {
+ _messageGroupManager.clearAssignments(subscription);
+ }
+
+ if(entry != null)
+ {
+ SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
+ // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
+ while (subscriberIter.advance())
+ {
+ Subscription sub = subscriberIter.getNode().getSubscription();
+
+ // we don't make browsers send the same stuff twice
+ if (sub.seesRequeues())
+ {
+ updateSubRequeueEntry(sub, entry);
+ }
+ }
+
+ deliverAsync();
+
+ }
+ }
+
public boolean getDeleteOnNoConsumers()
{
return _deleteOnNoConsumers;
@@ -592,7 +639,16 @@ public class SimpleAMQQueue implements A
public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException
{
- incrementTxnEnqueueStats(message);
+ enqueue(message, false, action);
+ }
+
+ public void enqueue(ServerMessage message, boolean transactional, PostEnqueueAction action) throws AMQException
+ {
+
+ if(transactional)
+ {
+ incrementTxnEnqueueStats(message);
+ }
incrementQueueCount();
incrementQueueSize(message);
@@ -689,21 +745,20 @@ public class SimpleAMQQueue implements A
{
try
{
- if (subscriptionReadyAndHasInterest(sub, entry)
- && !sub.isSuspended())
- {
- if (!sub.wouldSuspend(entry))
+ if (!sub.isSuspended()
+ && subscriptionReadyAndHasInterest(sub, entry)
+ && mightAssign(sub, entry)
+ && !sub.wouldSuspend(entry))
+ {
+ if (sub.acquires() && !(assign(sub, entry) && entry.acquire(sub)))
+ {
+ // restore credit here that would have been taken away by wouldSuspend since we didn't manage
+ // to acquire the entry for this subscription
+ sub.restoreCredit(entry);
+ }
+ else
{
- if (sub.acquires() && !entry.acquire(sub))
- {
- // restore credit here that would have been taken away by wouldSuspend since we didn't manage
- // to acquire the entry for this subscription
- sub.restoreCredit(entry);
- }
- else
- {
- deliverMessage(sub, entry, false);
- }
+ deliverMessage(sub, entry, false);
}
}
}
@@ -714,6 +769,20 @@ public class SimpleAMQQueue implements A
}
}
+ private boolean assign(final Subscription sub, final QueueEntry entry)
+ {
+ return _messageGroupManager == null || _messageGroupManager.acceptMessage(sub, entry);
+ }
+
+
+ private boolean mightAssign(final Subscription sub, final QueueEntry entry)
+ {
+ if(_messageGroupManager == null || !sub.acquires())
+ return true;
+ Subscription assigned = _messageGroupManager.getAssignedSubscription(entry);
+ return (assigned == null) || (assigned == sub);
+ }
+
protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry)
{
// This method is only required for queues which mess with ordering
@@ -739,13 +808,8 @@ public class SimpleAMQQueue implements A
private void incrementTxnEnqueueStats(final ServerMessage message)
{
- SessionConfig session = message.getSessionConfig();
-
- if(session !=null && session.isTransactional())
- {
- _msgTxnEnqueues.incrementAndGet();
- _byteTxnEnqueues.addAndGet(message.getSize());
- }
+ _msgTxnEnqueues.incrementAndGet();
+ _byteTxnEnqueues.addAndGet(message.getSize());
}
private void incrementTxnDequeueStats(QueueEntry entry)
@@ -1057,6 +1121,8 @@ public class SimpleAMQQueue implements A
public boolean filterComplete();
}
+
+
public List<QueueEntry> getMessagesOnTheQueue(final long fromMessageId, final long toMessageId)
{
return getMessagesOnTheQueue(new QueueEntryFilter()
@@ -1111,6 +1177,24 @@ public class SimpleAMQQueue implements A
}
+ public void visit(final Visitor visitor)
+ {
+ QueueEntryIterator queueListIterator = _entries.iterator();
+
+ while(queueListIterator.advance())
+ {
+ QueueEntry node = queueListIterator.getNode();
+
+ if(!node.isDispensed())
+ {
+ if(visitor.visit(node))
+ {
+ break;
+ }
+ }
+ }
+ }
+
/**
* Returns a list of QueEntries from a given range of queue positions, eg messages 5 to 10 on the queue.
*
@@ -1487,7 +1571,7 @@ public class SimpleAMQQueue implements A
{
}
- });
+ }, 0L);
txn.dequeue(this, entry.getMessage(),
new ServerTransaction.Action()
{
@@ -1565,7 +1649,7 @@ public class SimpleAMQQueue implements A
}
}
- public void checkCapacity(AMQChannel channel)
+ public void checkCapacity(AMQSessionModel channel)
{
if(_capacity != 0l)
{
@@ -1575,10 +1659,9 @@ public class SimpleAMQQueue implements A
//Overfull log message
_logActor.message(_logSubject, QueueMessages.OVERFULL(_atomicQueueSize.get(), _capacity));
- if(_blockedChannels.putIfAbsent(channel, Boolean.TRUE)==null)
- {
- channel.block(this);
- }
+ _blockedChannels.putIfAbsent(channel, Boolean.TRUE);
+
+ channel.block(this);
if(_atomicQueueSize.get() <= _flowResumeCapacity)
{
@@ -1610,7 +1693,7 @@ public class SimpleAMQQueue implements A
}
- for(AMQChannel c : _blockedChannels.keySet())
+ for(AMQSessionModel c : _blockedChannels.keySet())
{
c.unblock(this);
_blockedChannels.remove(c);
@@ -1752,11 +1835,11 @@ public class SimpleAMQQueue implements A
if (node != null && node.isAvailable())
{
- if (sub.hasInterest(node))
+ if (sub.hasInterest(node) && mightAssign(sub, node))
{
if (!sub.wouldSuspend(node))
{
- if (sub.acquires() && !node.acquire(sub))
+ if (sub.acquires() && !(assign(sub, node) && node.acquire(sub)))
{
// restore credit here that would have been taken away by wouldSuspend since we didn't manage
// to acquire the entry for this subscription
@@ -1813,7 +1896,8 @@ public class SimpleAMQQueue implements A
QueueEntry node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen);
boolean expired = false;
- while (node != null && (!node.isAvailable() || (expired = node.expired()) || !sub.hasInterest(node)))
+ while (node != null && (!node.isAvailable() || (expired = node.expired()) || !sub.hasInterest(node) ||
+ !mightAssign(sub,node)))
{
if (expired)
{
@@ -1841,6 +1925,19 @@ public class SimpleAMQQueue implements A
}
}
+ public boolean isEntryAheadOfSubscription(QueueEntry entry, Subscription sub)
+ {
+ QueueContext context = (QueueContext) sub.getQueueContext();
+ if(context != null)
+ {
+ QueueEntry releasedNode = context._releasedEntry;
+ return releasedNode == null || releasedNode.compareTo(entry) < 0;
+ }
+ else
+ {
+ return false;
+ }
+ }
/**
* Used by queue Runners to asynchronously deliver messages to consumers.
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java Thu Mar 1 15:42:44 2012
@@ -185,6 +185,11 @@ public class SimpleQueueEntryList implem
advanceHead();
}
+ public int getPriorities()
+ {
+ return 0;
+ }
+
static class Factory implements QueueEntryListFactory
{
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java Thu Mar 1 15:42:44 2012
@@ -51,13 +51,11 @@ public class SortedQueueEntryList implem
_propertyName = propertyName;
}
- @Override
public AMQQueue getQueue()
{
return _queue;
}
- @Override
public SortedQueueEntryImpl add(final ServerMessage message)
{
synchronized(_lock)
@@ -286,7 +284,6 @@ public class SortedQueueEntryList implem
return (node == null ? Colour.BLACK : node.getColour()) == colour;
}
- @Override
public SortedQueueEntryImpl next(final SortedQueueEntryImpl node)
{
synchronized(_lock)
@@ -316,13 +313,11 @@ public class SortedQueueEntryList implem
}
}
- @Override
public QueueEntryIterator<SortedQueueEntryImpl> iterator()
{
return new QueueEntryIteratorImpl(_head);
}
- @Override
public SortedQueueEntryImpl getHead()
{
return _head;
@@ -333,7 +328,6 @@ public class SortedQueueEntryList implem
return _root;
}
- @Override
public void entryDeleted(final SortedQueueEntryImpl entry)
{
synchronized(_lock)
@@ -431,6 +425,11 @@ public class SortedQueueEntryList implem
}
}
+ public int getPriorities()
+ {
+ return 0;
+ }
+
/**
* Swaps the position of the node in the tree with it's successor
* (that is the node with the next highest key)
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java Thu Mar 1 15:42:44 2012
@@ -21,18 +21,17 @@ package org.apache.qpid.server.queue;
*/
-import org.apache.qpid.pool.ReadWriteRunnable;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.AMQException;
-import org.apache.log4j.Logger;
-
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.subscription.Subscription;
+
-class SubFlushRunner implements ReadWriteRunnable
+class SubFlushRunner implements Runnable
{
private static final Logger _logger = Logger.getLogger(SubFlushRunner.class);
@@ -90,16 +89,6 @@ class SubFlushRunner implements ReadWrit
return (SimpleAMQQueue) _sub.getQueue();
}
- public boolean isRead()
- {
- return false;
- }
-
- public boolean isWrite()
- {
- return true;
- }
-
public String toString()
{
return "SubFlushRunner-" + _sub.getLogActor();
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java Thu Mar 1 15:42:44 2012
@@ -148,7 +148,7 @@ public abstract class ApplicationRegistr
BrokerConfig broker = new BrokerConfigAdapter(instance);
- SystemConfig system = (SystemConfig) store.getRoot();
+ SystemConfig system = store.getRoot();
system.addBroker(broker);
instance.setBroker(broker);
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java Thu Mar 1 15:42:44 2012
@@ -165,7 +165,6 @@ public class BrokerConfigAdapter impleme
/**
* @see org.apache.qpid.server.configuration.BrokerConfig#getFeatures()
*/
- @Override
public List<String> getFeatures()
{
final List<String> features = new ArrayList<String>();
@@ -176,4 +175,16 @@ public class BrokerConfigAdapter impleme
return Collections.unmodifiableList(features);
}
+
+ @Override
+ public String toString()
+ {
+ return "BrokerConfigAdapter{" +
+ "_id=" + _id +
+ ", _system=" + _system +
+ ", _vhosts=" + _vhosts +
+ ", _createTime=" + _createTime +
+ ", _federationTag='" + _federationTag + '\'' +
+ '}';
+ }
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java Thu Mar 1 15:42:44 2012
@@ -32,11 +32,9 @@ import static org.apache.qpid.server.sec
import java.net.SocketAddress;
import java.security.Principal;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
import javax.security.auth.Subject;
@@ -192,6 +190,15 @@ public class SecurityManager
return _logger;
}
+ private static class CachedPropertiesMap extends LinkedHashMap<String, PublishAccessCheck>
+ {
+ @Override
+ protected boolean removeEldestEntry(Entry<String, PublishAccessCheck> eldest)
+ {
+ return size() >= 200;
+ }
+ }
+
private abstract class AccessCheck
{
abstract Result allowed(SecurityPlugin plugin);
@@ -204,56 +211,61 @@ public class SecurityManager
return true;
}
- HashMap<String, SecurityPlugin> remainingPlugins = new HashMap<String, SecurityPlugin>(_globalPlugins);
+ Map<String, SecurityPlugin> remainingPlugins = _globalPlugins.isEmpty()
+ ? Collections.<String, SecurityPlugin>emptyMap()
+ : _hostPlugins.isEmpty() ? _globalPlugins : new HashMap<String, SecurityPlugin>(_globalPlugins);
- for (Entry<String, SecurityPlugin> hostEntry : _hostPlugins.entrySet())
+ if(!_hostPlugins.isEmpty())
{
- // Create set of global only plugins
- SecurityPlugin globalPlugin = remainingPlugins.get(hostEntry.getKey());
- if (globalPlugin != null)
- {
- remainingPlugins.remove(hostEntry.getKey());
- }
-
- Result host = checker.allowed(hostEntry.getValue());
-
- if (host == Result.DENIED)
- {
- // Something vetoed the access, we're done
- return false;
- }
-
- // host allow overrides global allow, so only check global on abstain or defer
- if (host != Result.ALLOWED)
- {
- if (globalPlugin == null)
- {
- if (host == Result.DEFER)
- {
- host = hostEntry.getValue().getDefault();
- }
- if (host == Result.DENIED)
+ for (Entry<String, SecurityPlugin> hostEntry : _hostPlugins.entrySet())
+ {
+ // Create set of global only plugins
+ SecurityPlugin globalPlugin = remainingPlugins.get(hostEntry.getKey());
+ if (globalPlugin != null)
+ {
+ remainingPlugins.remove(hostEntry.getKey());
+ }
+
+ Result host = checker.allowed(hostEntry.getValue());
+
+ if (host == Result.DENIED)
+ {
+ // Something vetoed the access, we're done
+ return false;
+ }
+
+ // host allow overrides global allow, so only check global on abstain or defer
+ if (host != Result.ALLOWED)
+ {
+ if (globalPlugin == null)
{
- return false;
+ if (host == Result.DEFER)
+ {
+ host = hostEntry.getValue().getDefault();
+ }
+ if (host == Result.DENIED)
+ {
+ return false;
+ }
}
- }
- else
- {
- Result global = checker.allowed(globalPlugin);
- if (global == Result.DEFER)
- {
- global = globalPlugin.getDefault();
- }
- if (global == Result.ABSTAIN && host == Result.DEFER)
- {
- global = hostEntry.getValue().getDefault();
- }
- if (global == Result.DENIED)
+ else
{
- return false;
+ Result global = checker.allowed(globalPlugin);
+ if (global == Result.DEFER)
+ {
+ global = globalPlugin.getDefault();
+ }
+ if (global == Result.ABSTAIN && host == Result.DEFER)
+ {
+ global = hostEntry.getValue().getDefault();
+ }
+ if (global == Result.DENIED)
+ {
+ return false;
+ }
}
- }
- }
+ }
+ }
}
for (SecurityPlugin plugin : remainingPlugins.values())
@@ -371,15 +383,41 @@ public class SecurityManager
});
}
- public boolean authorisePublish(final boolean immediate, final String routingKey, final String exchangeName)
+
+ private ConcurrentHashMap<String, ConcurrentHashMap<String, PublishAccessCheck>> _immediatePublishPropsCache
+ = new ConcurrentHashMap<String, ConcurrentHashMap<String, PublishAccessCheck>>();
+ private ConcurrentHashMap<String, ConcurrentHashMap<String, PublishAccessCheck>> _publishPropsCache
+ = new ConcurrentHashMap<String, ConcurrentHashMap<String, PublishAccessCheck>>();
+
+ public boolean authorisePublish(final boolean immediate, String routingKey, String exchangeName)
{
- return checkAllPlugins(new AccessCheck()
+ if(routingKey == null)
{
- Result allowed(SecurityPlugin plugin)
+ routingKey = "";
+ }
+ if(exchangeName == null)
+ {
+ exchangeName = "";
+ }
+ PublishAccessCheck check;
+ ConcurrentHashMap<String, ConcurrentHashMap<String, PublishAccessCheck>> cache =
+ immediate ? _immediatePublishPropsCache : _publishPropsCache;
+
+ ConcurrentHashMap<String, PublishAccessCheck> exchangeMap = cache.get(exchangeName);
+ if(exchangeMap == null)
+ {
+ cache.putIfAbsent(exchangeName, new ConcurrentHashMap<String, PublishAccessCheck>());
+ exchangeMap = cache.get(exchangeName);
+ }
+
+ check = exchangeMap.get(routingKey);
+ if(check == null)
{
- return plugin.authorise(PUBLISH, EXCHANGE, new ObjectProperties(exchangeName, routingKey, immediate));
+ check = new PublishAccessCheck(new ObjectProperties(exchangeName, routingKey, immediate));
+ exchangeMap.put(routingKey, check);
}
- });
+
+ return checkAllPlugins(check);
}
public boolean authorisePurge(final AMQQueue queue)
@@ -413,4 +451,19 @@ public class SecurityManager
return current;
}
+
+ private class PublishAccessCheck extends AccessCheck
+ {
+ private final ObjectProperties _props;
+
+ public PublishAccessCheck(ObjectProperties props)
+ {
+ _props = props;
+ }
+
+ Result allowed(SecurityPlugin plugin)
+ {
+ return plugin.authorise(PUBLISH, EXCHANGE, _props);
+ }
+ }
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java Thu Mar 1 15:42:44 2012
@@ -18,10 +18,7 @@
*/
package org.apache.qpid.server.security.access;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import org.apache.commons.lang.StringUtils;
import org.apache.qpid.framing.AMQShortString;
@@ -35,7 +32,7 @@ import org.apache.qpid.server.queue.AMQQ
* {@link #equals(Object)} and {@link #hashCode()} are intended for use in maps. This is due to the wildcard matching
* described above.
*/
-public class ObjectProperties extends HashMap<ObjectProperties.Property, String>
+public class ObjectProperties
{
/** serialVersionUID */
private static final long serialVersionUID = -1356019341374170495L;
@@ -93,7 +90,9 @@ public class ObjectProperties extends Ha
return properties;
}
}
-
+
+ private final EnumMap<Property, String> _properties = new EnumMap<Property, String>(Property.class);
+
public static List<String> getAllPropertyNames()
{
List<String> properties = new ArrayList<String>();
@@ -113,7 +112,7 @@ public class ObjectProperties extends Ha
{
super();
- putAll(copy);
+ _properties.putAll(copy._properties);
}
public ObjectProperties(String name)
@@ -231,7 +230,7 @@ public class ObjectProperties extends Ha
public List<String> getPropertyNames()
{
List<String> properties = new ArrayList<String>();
- for (Property property : keySet())
+ for (Property property : _properties.keySet())
{
properties.add(property.getName());
}
@@ -240,17 +239,22 @@ public class ObjectProperties extends Ha
public Boolean isSet(Property key)
{
- return containsKey(key) && Boolean.valueOf(get(key));
+ return _properties.containsKey(key) && Boolean.valueOf(_properties.get(key));
}
-
+
+ public String get(Property key)
+ {
+ return _properties.get(key);
+ }
+
public String getName()
{
- return get(Property.NAME);
+ return _properties.get(Property.NAME);
}
public void setName(String name)
{
- put(Property.NAME, name);
+ _properties.put(Property.NAME, name);
}
public void setName(AMQShortString name)
@@ -262,39 +266,38 @@ public class ObjectProperties extends Ha
{
return put(key, value == null ? "" : value.asString());
}
-
- @Override
+
public String put(Property key, String value)
{
- return super.put(key, value == null ? "" : value.trim());
+ return _properties.put(key, value == null ? "" : value.trim());
}
public void put(Property key, Boolean value)
{
if (value != null)
{
- super.put(key, Boolean.toString(value));
+ _properties.put(key, Boolean.toString(value));
}
}
public boolean matches(ObjectProperties properties)
{
- if (properties.keySet().isEmpty())
+ if (properties._properties.keySet().isEmpty())
{
return true;
}
- if (!keySet().containsAll(properties.keySet()))
+ if (!_properties.keySet().containsAll(properties._properties.keySet()))
{
return false;
}
- for (Map.Entry<Property,String> entry : properties.entrySet())
+ for (Map.Entry<Property,String> entry : properties._properties.entrySet())
{
Property key = entry.getKey();
String ruleValue = entry.getValue();
- String thisValue = get(key);
+ String thisValue = _properties.get(key);
if (!valueMatches(thisValue, ruleValue))
{
@@ -315,4 +318,29 @@ public class ObjectProperties extends Ha
&& thisValue.length() > ruleValue.length()
&& thisValue.startsWith(ruleValue.substring(0, ruleValue.length() - 2)));
}
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ ObjectProperties that = (ObjectProperties) o;
+
+ if (_properties != null ? !_properties.equals(that._properties) : that._properties != null) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return _properties != null ? _properties.hashCode() : 0;
+ }
+
+ @Override
+ public String toString()
+ {
+ return _properties.toString();
+ }
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java Thu Mar 1 15:42:44 2012
@@ -21,6 +21,9 @@
package org.apache.qpid.server.store;
import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.UUID;
+
import org.apache.qpid.framing.FieldTable;
public interface ConfigurationRecoveryHandler
@@ -42,7 +45,19 @@ public interface ConfigurationRecoveryHa
public static interface BindingRecoveryHandler
{
void binding(String exchangeName, String queueName, String bindingKey, ByteBuffer buf);
- void completeBindingRecovery();
+ BrokerLinkRecoveryHandler completeBindingRecovery();
+ }
+
+ public static interface BrokerLinkRecoveryHandler
+ {
+ BridgeRecoveryHandler brokerLink(UUID id, long createTime, Map<String,String> arguments);
+ void completeBrokerLinkRecovery();
+ }
+
+ public static interface BridgeRecoveryHandler
+ {
+ void bridge(UUID id, long createTime, Map<String,String> arguments);
+ void completeBridgeRecoveryForLink();
}
public static interface QueueEntryRecoveryHandler
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java Thu Mar 1 15:42:44 2012
@@ -21,7 +21,9 @@
package org.apache.qpid.server.store;
import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
+import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.lang.ref.SoftReference;
@@ -36,7 +38,10 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -47,11 +52,14 @@ import org.apache.qpid.AMQStoreException
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.federation.Bridge;
+import org.apache.qpid.server.federation.BrokerLink;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
import org.apache.qpid.server.logging.messages.MessageStoreMessages;
import org.apache.qpid.server.logging.messages.TransactionLogMessages;
+import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.queue.AMQQueue;
/**
@@ -60,7 +68,7 @@ import org.apache.qpid.server.queue.AMQQ
*
* TODO extract the SQL statements into a generic JDBC store
*/
-public class DerbyMessageStore implements MessageStore
+public class DerbyMessageStore implements MessageStore, DurableConfigurationStore
{
private static final Logger _logger = Logger.getLogger(DerbyMessageStore.class);
@@ -80,6 +88,10 @@ public class DerbyMessageStore implement
private static final String META_DATA_TABLE_NAME = "QPID_META_DATA";
private static final String MESSAGE_CONTENT_TABLE_NAME = "QPID_MESSAGE_CONTENT";
+ private static final String LINKS_TABLE_NAME = "QPID_LINKS";
+ private static final String BRIDGES_TABLE_NAME = "QPID_BRIDGES";
+
+
private static final int DB_VERSION = 3;
@@ -135,6 +147,49 @@ public class DerbyMessageStore implement
private static final String DELETE_FROM_META_DATA = "DELETE FROM " + META_DATA_TABLE_NAME + " WHERE message_id = ?";
private static final String SELECT_ALL_FROM_META_DATA = "SELECT message_id, meta_data FROM " + META_DATA_TABLE_NAME;
+ private static final String CREATE_LINKS_TABLE =
+ "CREATE TABLE "+LINKS_TABLE_NAME+" ( id_lsb bigint not null,"
+ + " id_msb bigint not null,"
+ + " create_time bigint not null,"
+ + " arguments blob, PRIMARY KEY ( id_lsb, id_msb ))";
+ private static final String SELECT_FROM_LINKS =
+ "SELECT create_time, arguments FROM " + LINKS_TABLE_NAME + " WHERE id_lsb = ? and id_msb";
+ private static final String DELETE_FROM_LINKS = "DELETE FROM " + LINKS_TABLE_NAME
+ + " WHERE id_lsb = ? and id_msb = ?";
+ private static final String SELECT_ALL_FROM_LINKS = "SELECT id_lsb, id_msb, create_time, "
+ + "arguments FROM " + LINKS_TABLE_NAME;
+ private static final String FIND_LINK = "SELECT id_lsb, id_msb FROM " + LINKS_TABLE_NAME + " WHERE id_lsb = ? and"
+ + " id_msb = ?";
+ private static final String INSERT_INTO_LINKS = "INSERT INTO " + LINKS_TABLE_NAME + "( id_lsb, "
+ + "id_msb, create_time, arguments ) values (?, ?, ?, ?)";
+
+
+ private static final String CREATE_BRIDGES_TABLE =
+ "CREATE TABLE "+BRIDGES_TABLE_NAME+" ( id_lsb bigint not null,"
+ + " id_msb bigint not null,"
+ + " create_time bigint not null,"
+ + " link_id_lsb bigint not null,"
+ + " link_id_msb bigint not null,"
+ + " arguments blob, PRIMARY KEY ( id_lsb, id_msb ))";
+ private static final String SELECT_FROM_BRIDGES =
+ "SELECT create_time, link_id_lsb, link_id_msb, arguments FROM "
+ + BRIDGES_TABLE_NAME + " WHERE id_lsb = ? and id_msb = ?";
+ private static final String DELETE_FROM_BRIDGES = "DELETE FROM " + BRIDGES_TABLE_NAME
+ + " WHERE id_lsb = ? and id_msb = ?";
+ private static final String SELECT_ALL_FROM_BRIDGES = "SELECT id_lsb, id_msb, "
+ + " create_time,"
+ + " link_id_lsb, link_id_msb, "
+ + "arguments FROM " + BRIDGES_TABLE_NAME
+ + " WHERE link_id_lsb = ? and link_id_msb = ?";
+ private static final String FIND_BRIDGE = "SELECT id_lsb, id_msb FROM " + BRIDGES_TABLE_NAME +
+ " WHERE id_lsb = ? and id_msb = ?";
+ private static final String INSERT_INTO_BRIDGES = "INSERT INTO " + BRIDGES_TABLE_NAME + "( id_lsb, id_msb, "
+ + "create_time, "
+ + "link_id_lsb, link_id_msb, "
+ + "arguments )"
+ + " values (?, ?, ?, ?, ?, ?)";
+
+
private static final String DERBY_SINGLE_DB_SHUTDOWN_CODE = "08006";
@@ -197,12 +252,16 @@ public class DerbyMessageStore implement
Configuration storeConfiguration,
LogSubject logSubject) throws Exception
{
- CurrentActor.get().message(_logSubject, MessageStoreMessages.CREATED(this.getClass().getName()));
-
if(!_configured)
{
_logSubject = logSubject;
+ }
+
+ CurrentActor.get().message(_logSubject, MessageStoreMessages.CREATED(this.getClass().getName()));
+
+ if(!_configured)
+ {
commonConfiguration(name, storeConfiguration, logSubject);
_configured = true;
@@ -219,6 +278,11 @@ public class DerbyMessageStore implement
Configuration storeConfiguration,
LogSubject logSubject) throws Exception
{
+
+ if(!_configured)
+ {
+ _logSubject = logSubject;
+ }
CurrentActor.get().message(_logSubject, TransactionLogMessages.CREATED(this.getClass().getName()));
if(!_configured)
@@ -283,6 +347,8 @@ public class DerbyMessageStore implement
createQueueEntryTable(conn);
createMetaDataTable(conn);
createMessageContentTable(conn);
+ createLinkTable(conn);
+ createBridgeTable(conn);
conn.close();
}
@@ -419,6 +485,40 @@ public class DerbyMessageStore implement
}
+ private void createLinkTable(final Connection conn) throws SQLException
+ {
+ if(!tableExists(LINKS_TABLE_NAME, conn))
+ {
+ Statement stmt = conn.createStatement();
+ try
+ {
+ stmt.execute(CREATE_LINKS_TABLE);
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ }
+
+
+ private void createBridgeTable(final Connection conn) throws SQLException
+ {
+ if(!tableExists(BRIDGES_TABLE_NAME, conn))
+ {
+ Statement stmt = conn.createStatement();
+ try
+ {
+ stmt.execute(CREATE_BRIDGES_TABLE);
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ }
+
+
private boolean tableExists(final String tableName, final Connection conn) throws SQLException
@@ -459,7 +559,8 @@ public class DerbyMessageStore implement
List<String> exchanges = loadExchanges(erh);
ConfigurationRecoveryHandler.BindingRecoveryHandler brh = erh.completeExchangeRecovery();
recoverBindings(brh, exchanges);
- brh.completeBindingRecovery();
+ ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler lrh = brh.completeBindingRecovery();
+ recoverBrokerLinks(lrh);
}
catch (SQLException e)
{
@@ -470,6 +571,144 @@ public class DerbyMessageStore implement
}
+ private void recoverBrokerLinks(final ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler lrh)
+ throws SQLException
+ {
+ _logger.info("Recovering broker links...");
+
+ Connection conn = null;
+ try
+ {
+ conn = newAutoCommitConnection();
+
+ PreparedStatement stmt = conn.prepareStatement(SELECT_ALL_FROM_LINKS);
+
+ try
+ {
+ ResultSet rs = stmt.executeQuery();
+
+ try
+ {
+
+ while(rs.next())
+ {
+ UUID id = new UUID(rs.getLong(2), rs.getLong(1));
+ long createTime = rs.getLong(3);
+ Blob argumentsAsBlob = rs.getBlob(4);
+
+ byte[] dataAsBytes = argumentsAsBlob.getBytes(1,(int) argumentsAsBlob.length());
+
+ DataInputStream dis = new DataInputStream(new ByteArrayInputStream(dataAsBytes));
+ int size = dis.readInt();
+
+ Map<String,String> arguments = new HashMap<String, String>();
+
+ for(int i = 0; i < size; i++)
+ {
+ arguments.put(dis.readUTF(), dis.readUTF());
+ }
+
+ ConfigurationRecoveryHandler.BridgeRecoveryHandler brh = lrh.brokerLink(id, createTime, arguments);
+
+ recoverBridges(brh, id);
+
+ }
+ }
+ catch (IOException e)
+ {
+ throw new SQLException(e.getMessage(), e);
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+ }
+ finally
+ {
+ if(conn != null)
+ {
+ conn.close();
+ }
+ }
+
+ }
+
+ private void recoverBridges(final ConfigurationRecoveryHandler.BridgeRecoveryHandler brh, final UUID linkId)
+ throws SQLException
+ {
+ _logger.info("Recovering bridges for link " + linkId + "...");
+
+ Connection conn = null;
+ try
+ {
+ conn = newAutoCommitConnection();
+
+ PreparedStatement stmt = conn.prepareStatement(SELECT_ALL_FROM_BRIDGES);
+ stmt.setLong(1, linkId.getLeastSignificantBits());
+ stmt.setLong(2, linkId.getMostSignificantBits());
+
+
+ try
+ {
+ ResultSet rs = stmt.executeQuery();
+
+ try
+ {
+
+ while(rs.next())
+ {
+ UUID id = new UUID(rs.getLong(2), rs.getLong(1));
+ long createTime = rs.getLong(3);
+ Blob argumentsAsBlob = rs.getBlob(6);
+
+ byte[] dataAsBytes = argumentsAsBlob.getBytes(1,(int) argumentsAsBlob.length());
+
+ DataInputStream dis = new DataInputStream(new ByteArrayInputStream(dataAsBytes));
+ int size = dis.readInt();
+
+ Map<String,String> arguments = new HashMap<String, String>();
+
+ for(int i = 0; i < size; i++)
+ {
+ arguments.put(dis.readUTF(), dis.readUTF());
+ }
+
+ brh.bridge(id, createTime, arguments);
+
+ }
+ brh.completeBridgeRecoveryForLink();
+ }
+ catch (IOException e)
+ {
+ throw new SQLException(e.getMessage(), e);
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+ }
+ finally
+ {
+ if(conn != null)
+ {
+ conn.close();
+ }
+ }
+
+ }
+
private void loadQueues(ConfigurationRecoveryHandler.QueueRecoveryHandler qrh) throws SQLException
{
Connection conn = newAutoCommitConnection();
@@ -697,7 +936,7 @@ public class DerbyMessageStore implement
if (results == 0)
{
- throw new RuntimeException("Message metadata not found for message id " + messageId);
+ _logger.warn("Message metadata not found for message id " + messageId);
}
if (_logger.isDebugEnabled())
@@ -1180,6 +1419,233 @@ public class DerbyMessageStore implement
}
+ public void createBrokerLink(final BrokerLink link) throws AMQStoreException
+ {
+ _logger.debug("public void createBrokerLink(BrokerLink = " + link + "): called");
+
+ if (_state != State.RECOVERING)
+ {
+ try
+ {
+ Connection conn = newAutoCommitConnection();
+
+ PreparedStatement stmt = conn.prepareStatement(FIND_LINK);
+ try
+ {
+
+ stmt.setLong(1, link.getId().getLeastSignificantBits());
+ stmt.setLong(2, link.getId().getMostSignificantBits());
+ ResultSet rs = stmt.executeQuery();
+ try
+ {
+
+ // If we don't have any data in the result set then we can add this queue
+ if (!rs.next())
+ {
+ PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_LINKS);
+
+ try
+ {
+
+ insertStmt.setLong(1, link.getId().getLeastSignificantBits());
+ insertStmt.setLong(2, link.getId().getMostSignificantBits());
+ insertStmt.setLong(3, link.getCreateTime());
+
+ byte[] argumentBytes = convertStringMapToBytes(link.getArguments());
+ ByteArrayInputStream bis = new ByteArrayInputStream(argumentBytes);
+
+ insertStmt.setBinaryStream(4,bis,argumentBytes.length);
+
+ insertStmt.execute();
+ }
+ finally
+ {
+ insertStmt.close();
+ }
+ }
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+ conn.close();
+
+ }
+ catch (SQLException e)
+ {
+ throw new AMQStoreException("Error writing " + link + " to database: " + e.getMessage(), e);
+ }
+ }
+ }
+
+ private byte[] convertStringMapToBytes(final Map<String, String> arguments) throws AMQStoreException
+ {
+ byte[] argumentBytes;
+ if(arguments == null)
+ {
+ argumentBytes = new byte[0];
+ }
+ else
+ {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+
+
+ try
+ {
+ dos.writeInt(arguments.size());
+ for(Map.Entry<String,String> arg : arguments.entrySet())
+ {
+ dos.writeUTF(arg.getKey());
+ dos.writeUTF(arg.getValue());
+ }
+ }
+ catch (IOException e)
+ {
+ // This should never happen
+ throw new AMQStoreException(e.getMessage(), e);
+ }
+ argumentBytes = bos.toByteArray();
+ }
+ return argumentBytes;
+ }
+
+ public void deleteBrokerLink(final BrokerLink link) throws AMQStoreException
+ {
+ _logger.debug("public void deleteBrokerLink( " + link + "): called");
+ Connection conn = null;
+ PreparedStatement stmt = null;
+ try
+ {
+ conn = newAutoCommitConnection();
+ stmt = conn.prepareStatement(DELETE_FROM_LINKS);
+ stmt.setLong(1, link.getId().getLeastSignificantBits());
+ stmt.setLong(2, link.getId().getMostSignificantBits());
+ int results = stmt.executeUpdate();
+
+ if (results == 0)
+ {
+ throw new AMQStoreException("Link " + link + " not found");
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new AMQStoreException("Error deleting Link " + link + " from database: " + e.getMessage(), e);
+ }
+ finally
+ {
+ closePreparedStatement(stmt);
+ closeConnection(conn);
+ }
+
+
+ }
+
+ public void createBridge(final Bridge bridge) throws AMQStoreException
+ {
+ _logger.debug("public void createBridge(BrokerLink = " + bridge + "): called");
+
+ if (_state != State.RECOVERING)
+ {
+ try
+ {
+ Connection conn = newAutoCommitConnection();
+
+ PreparedStatement stmt = conn.prepareStatement(FIND_BRIDGE);
+ try
+ {
+
+ UUID id = bridge.getId();
+ stmt.setLong(1, id.getLeastSignificantBits());
+ stmt.setLong(2, id.getMostSignificantBits());
+ ResultSet rs = stmt.executeQuery();
+ try
+ {
+
+ // If we don't have any data in the result set then we can add this queue
+ if (!rs.next())
+ {
+ PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_BRIDGES);
+
+ try
+ {
+
+ insertStmt.setLong(1, id.getLeastSignificantBits());
+ insertStmt.setLong(2, id.getMostSignificantBits());
+
+ insertStmt.setLong(3, bridge.getCreateTime());
+
+ UUID linkId = bridge.getLink().getId();
+ insertStmt.setLong(4, linkId.getLeastSignificantBits());
+ insertStmt.setLong(5, linkId.getMostSignificantBits());
+
+ byte[] argumentBytes = convertStringMapToBytes(bridge.getArguments());
+ ByteArrayInputStream bis = new ByteArrayInputStream(argumentBytes);
+
+ insertStmt.setBinaryStream(6,bis,argumentBytes.length);
+
+ insertStmt.execute();
+ }
+ finally
+ {
+ insertStmt.close();
+ }
+ }
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+ conn.close();
+
+ }
+ catch (SQLException e)
+ {
+ throw new AMQStoreException("Error writing " + bridge + " to database: " + e.getMessage(), e);
+ }
+ }
+ }
+
+ public void deleteBridge(final Bridge bridge) throws AMQStoreException
+ {
+ _logger.debug("public void deleteBridge( " + bridge + "): called");
+ Connection conn = null;
+ PreparedStatement stmt = null;
+ try
+ {
+ conn = newAutoCommitConnection();
+ stmt = conn.prepareStatement(DELETE_FROM_BRIDGES);
+ stmt.setLong(1, bridge.getId().getLeastSignificantBits());
+ stmt.setLong(2, bridge.getId().getMostSignificantBits());
+ int results = stmt.executeUpdate();
+
+ if (results == 0)
+ {
+ throw new AMQStoreException("Bridge " + bridge + " not found");
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new AMQStoreException("Error deleting bridge " + bridge + " from database: " + e.getMessage(), e);
+ }
+ finally
+ {
+ closePreparedStatement(stmt);
+ closeConnection(conn);
+ }
+
+ }
+
public Transaction newTransaction()
{
return new DerbyTransaction();
@@ -1678,14 +2144,26 @@ public class DerbyMessageStore implement
}
}
- public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
+ public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
{
- DerbyMessageStore.this.enqueueMessage(_connWrapper, queue, messageId);
+ if(message.getStoredMessage() instanceof StoredDerbyMessage)
+ {
+ try
+ {
+ ((StoredDerbyMessage)message.getStoredMessage()).store(_connWrapper.getConnection());
+ }
+ catch (SQLException e)
+ {
+ throw new AMQStoreException("Exception on enqueuing message " + _messageId, e);
+ }
+ }
+
+ DerbyMessageStore.this.enqueueMessage(_connWrapper, queue, message.getMessageNumber());
}
- public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
+ public void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
{
- DerbyMessageStore.this.dequeueMessage(_connWrapper, queue, messageId);
+ DerbyMessageStore.this.dequeueMessage(_connWrapper, queue, message.getMessageNumber());
}
@@ -1709,8 +2187,11 @@ public class DerbyMessageStore implement
{
private final long _messageId;
+ private StorableMessageMetaData _metaData;
private volatile SoftReference<StorableMessageMetaData> _metaDataRef;
- private Connection _conn;
+ private byte[] _data;
+ private volatile SoftReference<byte[]> _dataRef;
+
StoredDerbyMessage(long messageId, StorableMessageMetaData metaData)
{
@@ -1721,27 +2202,19 @@ public class DerbyMessageStore implement
StoredDerbyMessage(long messageId,
StorableMessageMetaData metaData, boolean persist)
{
- try
- {
- _messageId = messageId;
+ _messageId = messageId;
+
- _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
- if(persist)
- {
- _conn = newConnection();
- storeMetaData(_conn, messageId, metaData);
- }
- }
- catch (SQLException e)
+ _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
+ if(persist)
{
- throw new RuntimeException(e);
+ _metaData = metaData;
}
-
}
public StorableMessageMetaData getMetaData()
{
- StorableMessageMetaData metaData = _metaDataRef.get();
+ StorableMessageMetaData metaData = _metaData == null ? _metaDataRef.get() : _metaData;
if(metaData == null)
{
try
@@ -1765,27 +2238,62 @@ public class DerbyMessageStore implement
public void addContent(int offsetInMessage, java.nio.ByteBuffer src)
{
- DerbyMessageStore.this.addContent(_conn, _messageId, offsetInMessage, src);
+ src = src.slice();
+
+ if(_data == null)
+ {
+ _data = new byte[src.remaining()];
+ _dataRef = new SoftReference<byte[]>(_data);
+ src.duplicate().get(_data);
+ }
+ else
+ {
+ byte[] oldData = _data;
+ _data = new byte[oldData.length + src.remaining()];
+ _dataRef = new SoftReference<byte[]>(_data);
+
+ System.arraycopy(oldData,0,_data,0,oldData.length);
+ src.duplicate().get(_data, oldData.length, src.remaining());
+ }
+
}
public int getContent(int offsetInMessage, java.nio.ByteBuffer dst)
{
- return DerbyMessageStore.this.getContent(_messageId, offsetInMessage, dst);
+ byte[] data = _dataRef == null ? null : _dataRef.get();
+ if(data != null)
+ {
+ int length = Math.min(dst.remaining(), data.length - offsetInMessage);
+ dst.put(data, offsetInMessage, length);
+ return length;
+ }
+ else
+ {
+ return DerbyMessageStore.this.getContent(_messageId, offsetInMessage, dst);
+ }
+ }
+
+
+ public ByteBuffer getContent(int offsetInMessage, int size)
+ {
+ ByteBuffer buf = ByteBuffer.allocate(size);
+ getContent(offsetInMessage, buf);
+ buf.position(0);
+ return buf;
}
- public StoreFuture flushToStore()
+ public synchronized StoreFuture flushToStore()
{
try
{
- if(_conn != null)
+ if(_metaData != null)
{
- if(_logger.isDebugEnabled())
- {
- _logger.debug("Flushing message " + _messageId + " to store");
- }
+ Connection conn = newConnection();
+
+ store(conn);
- _conn.commit();
- _conn.close();
+ conn.commit();
+ conn.close();
}
}
catch (SQLException e)
@@ -1796,16 +2304,34 @@ public class DerbyMessageStore implement
}
throw new RuntimeException(e);
}
- finally
+ return IMMEDIATE_FUTURE;
+ }
+
+ private synchronized void store(final Connection conn) throws SQLException
+ {
+ if(_metaData != null)
{
- _conn = null;
+ try
+ {
+ storeMetaData(conn, _messageId, _metaData);
+ DerbyMessageStore.this.addContent(conn, _messageId, 0,
+ _data == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(_data));
+ }
+ finally
+ {
+ _metaData = null;
+ _data = null;
+ }
+ }
+
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Storing message " + _messageId + " to store");
}
- return IMMEDIATE_FUTURE;
}
public void remove()
{
- flushToStore();
DerbyMessageStore.this.removeMessage(_messageId);
}
}
@@ -1839,4 +2365,5 @@ public class DerbyMessageStore implement
}
}
}
+
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java Thu Mar 1 15:42:44 2012
@@ -25,6 +25,8 @@ import org.apache.qpid.AMQStoreException
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.federation.Bridge;
+import org.apache.qpid.server.federation.BrokerLink;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.queue.AMQQueue;
@@ -128,4 +130,12 @@ public interface DurableConfigurationSto
* @throws AMQStoreException If the operation fails for any reason.
*/
void updateQueue(AMQQueue queue) throws AMQStoreException;
+
+ void createBrokerLink(BrokerLink link) throws AMQStoreException;
+
+ void deleteBrokerLink(BrokerLink link) throws AMQStoreException;
+
+ void createBridge(Bridge bridge) throws AMQStoreException;
+
+ void deleteBridge(Bridge bridge) throws AMQStoreException;
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java Thu Mar 1 15:42:44 2012
@@ -31,14 +31,18 @@ import org.apache.qpid.AMQStoreException
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.federation.Bridge;
+import org.apache.qpid.server.federation.BrokerLink;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
import org.apache.qpid.server.logging.messages.MessageStoreMessages;
+import org.apache.qpid.server.message.EnqueableMessage;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.AMQQueue;
/** A simple message store that stores the messages in a threadsafe structure in memory. */
-public class MemoryMessageStore implements MessageStore
+public class MemoryMessageStore implements MessageStore, DurableConfigurationStore
{
private static final Logger _log = Logger.getLogger(MemoryMessageStore.class);
@@ -53,11 +57,11 @@ public class MemoryMessageStore implemen
private static final Transaction IN_MEMORY_TRANSACTION = new Transaction()
{
- public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
+ public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
{
}
- public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
+ public void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
{
}
@@ -155,6 +159,26 @@ public class MemoryMessageStore implemen
// Not required to do anything
}
+ public void createBrokerLink(final BrokerLink link) throws AMQStoreException
+ {
+
+ }
+
+ public void deleteBrokerLink(final BrokerLink link) throws AMQStoreException
+ {
+
+ }
+
+ public void createBridge(final Bridge bridge) throws AMQStoreException
+ {
+
+ }
+
+ public void deleteBridge(final Bridge bridge) throws AMQStoreException
+ {
+
+ }
+
public void configureTransactionLog(String name,
TransactionLogRecoveryHandler recoveryHandler,
Configuration storeConfiguration,
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java Thu Mar 1 15:42:44 2012
@@ -20,14 +20,16 @@
*/
package org.apache.qpid.server.store;
+import org.apache.qpid.AMQStoreException;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.commons.configuration.Configuration;
+import org.apache.qpid.server.message.EnqueableMessage;
/**
* MessageStore defines the interface to a storage area, which can be used to preserve the state of messages.
*
*/
-public interface MessageStore extends DurableConfigurationStore, TransactionLog
+public interface MessageStore
{
StoreFuture IMMEDIATE_FUTURE = new StoreFuture()
{
@@ -77,4 +79,69 @@ public interface MessageStore extends Du
boolean isPersistent();
+
+ public static interface Transaction
+ {
+ /**
+ * Places a message onto a specified queue, in a given transactional context.
+ *
+ *
+ *
+ * @param queue The queue to place the message on.
+ * @param message
+ * @throws org.apache.qpid.AMQStoreException If the operation fails for any reason.
+ */
+ void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException;
+
+ /**
+ * Extracts a message from a specified queue, in a given transactional context.
+ *
+ * @param queue The queue to place the message on.
+ * @param message The message to dequeue.
+ * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
+ */
+ void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException;
+
+
+ /**
+ * Commits all operations performed within a given transactional context.
+ *
+ * @throws AMQStoreException If the operation fails for any reason.
+ */
+ void commitTran() throws AMQStoreException;
+
+ /**
+ * Commits all operations performed within a given transactional context.
+ *
+ * @throws AMQStoreException If the operation fails for any reason.
+ */
+ StoreFuture commitTranAsync() throws AMQStoreException;
+
+ /**
+ * Abandons all operations performed within a given transactional context.
+ *
+ * @throws AMQStoreException If the operation fails for any reason.
+ */
+ void abortTran() throws AMQStoreException;
+
+
+
+ }
+
+ public void configureTransactionLog(String name,
+ TransactionLogRecoveryHandler recoveryHandler,
+ Configuration storeConfiguration,
+ LogSubject logSubject) throws Exception;
+
+ Transaction newTransaction();
+
+
+
+ public static interface StoreFuture
+ {
+ boolean isComplete();
+
+ void waitForCompletion();
+ }
+
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java Thu Mar 1 15:42:44 2012
@@ -30,5 +30,7 @@ public interface StorableMessageMetaData
int writeToBuffer(int offsetInMetaData, ByteBuffer dest);
+ int getContentSize();
+
boolean isPersistent();
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org