You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2010/01/17 14:17:11 UTC
svn commit: r900135 - in /qpid/branches/0.5.x-dev/qpid/java:
broker/src/main/java/org/apache/qpid/server/configuration/
broker/src/main/java/org/apache/qpid/server/queue/
broker/src/main/java/org/apache/qpid/server/subscription/
broker/src/test/java/or...
Author: rgodfrey
Date: Sun Jan 17 13:17:10 2010
New Revision: 900135
URL: http://svn.apache.org/viewvc?rev=900135&view=rev
Log:
QPID-2321 : Updated conflation queue to use same argument names as C++ for LVQ; added support for "browse only" consumers
Added:
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueContext.java
Modified:
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
qpid/branches/0.5.x-dev/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java
qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java
qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java?rev=900135&r1=900134&r2=900135&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java Sun Jan 17 13:17:10 2010
@@ -68,11 +68,6 @@
return _config.getInt("priorities", -1);
}
- public String getConflationKey()
- {
- return _config.getString("conflationKey", null);
- }
-
public String getExchange()
{
return _config.getString("exchange", null);
@@ -123,4 +118,14 @@
return _config.getLong("flowResumeCapacity", _vHostConfig.getFlowResumeCapacity());
}
+ public boolean isLVQ()
+ {
+ return _config.getBoolean("lvq", false);
+ }
+
+ public String getLVQKey()
+ {
+ return _config.getString("lvqKey", null);
+ }
+
}
Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java?rev=900135&r1=900134&r2=900135&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java Sun Jan 17 13:17:10 2010
@@ -52,19 +52,27 @@
while(subIter.advance() && !entry.isAcquired())
{
final Subscription subscription = subIter.getNode().getSubscription();
- QueueEntry subnode = subscription.getLastSeenEntry();
- while(subnode != null && entry.compareTo(subnode) < 0 && !entry.isAcquired())
+ if(!subscription.isClosed())
{
- if(subscription.setLastSeenEntry(subnode,entry))
+ QueueContext context = (QueueContext) subscription.getQueueContext();
+ if(context != null)
{
- break;
- }
- else
- {
- subnode = subscription.getLastSeenEntry();
+ QueueEntry subnode = context._lastSeenEntry;
+ QueueEntry released = context._releasedEntry;
+ while(subnode != null && entry.compareTo(subnode) < 0 && !entry.isAcquired() && (released == null || released.compareTo(entry) < 0))
+ {
+ if(QueueContext._releasedUpdater.compareAndSet(context,released,entry))
+ {
+ break;
+ }
+ else
+ {
+ subnode = context._lastSeenEntry;
+ released = context._releasedEntry;
+ }
+ }
}
}
-
}
}
}
Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=900135&r1=900134&r2=900135&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Sun Jan 17 13:17:10 2010
@@ -40,6 +40,10 @@
public interface AMQQueue extends Managable, Comparable<AMQQueue>
{
+ public interface Context
+ {
+ QueueEntry getLastSeenEntry();
+ }
AMQShortString getName();
Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java?rev=900135&r1=900134&r2=900135&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java Sun Jan 17 13:17:10 2010
@@ -33,7 +33,9 @@
public class AMQQueueFactory
{
public static final AMQShortString X_QPID_PRIORITIES = new AMQShortString("x-qpid-priorities");
- public static final AMQShortString X_QPID_CONFLATION_KEY = new AMQShortString ("x-qpid-conflation-key");
+ private static final AMQShortString QPID_LAST_VALUE_QUEUE = new AMQShortString ("qpid.last_value_queue");
+ private static final AMQShortString QPID_LAST_VALUE_QUEUE_KEY = new AMQShortString("qpid.last_value_queue_key");
+ private static final String QPID_LVQ_KEY = "qpid.LVQ_key";
private abstract static class QueueProperty
{
@@ -134,7 +136,17 @@
throws AMQException
{
final int priorities = arguments == null ? 1 : arguments.containsKey(X_QPID_PRIORITIES) ? arguments.getInteger(X_QPID_PRIORITIES) : 1;
- final String conflationKey = arguments == null ? null : arguments.containsKey(X_QPID_CONFLATION_KEY) ? arguments.getString(X_QPID_CONFLATION_KEY) : null;
+ String conflationKey = null;
+
+ if(arguments != null && (arguments.containsKey(QPID_LAST_VALUE_QUEUE) || arguments.containsKey(QPID_LAST_VALUE_QUEUE_KEY)))
+ {
+ conflationKey = arguments.getString(QPID_LAST_VALUE_QUEUE_KEY);
+ if(conflationKey == null)
+ {
+ conflationKey = QPID_LVQ_KEY;
+ }
+ }
+
AMQQueue q = null;
if(conflationKey != null)
@@ -190,14 +202,15 @@
}
arguments.put(new AMQShortString("x-qpid-priorities"), priorities);
}
- String conflationKey = config.getConflationKey();
- if(conflationKey != null)
+ if(config.isLVQ() || config.getLVQKey() != null)
{
if(arguments == null)
{
arguments = new FieldTable();
}
- arguments.put(new AMQShortString("x-qpid-conflation-key"), conflationKey);
+ arguments.setInteger(QPID_LAST_VALUE_QUEUE, 1);
+ arguments.setString(QPID_LAST_VALUE_QUEUE_KEY, config.getLVQKey() == null ? QPID_LVQ_KEY : config.getLVQKey());
+
}
AMQQueue q = createAMQQueueImpl(queueName, durable, owner, autodelete, host, arguments);
Added: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueContext.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueContext.java?rev=900135&view=auto
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueContext.java (added)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueContext.java Sun Jan 17 13:17:10 2010
@@ -0,0 +1,49 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.queue;
+
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+final class QueueContext implements AMQQueue.Context
+{
+ volatile QueueEntry _lastSeenEntry;
+ volatile QueueEntry _releasedEntry;
+
+ static final AtomicReferenceFieldUpdater<QueueContext, QueueEntry>
+ _lastSeenUpdater =
+ AtomicReferenceFieldUpdater.newUpdater
+ (QueueContext.class, QueueEntry.class, "_lastSeenEntry");
+ static final AtomicReferenceFieldUpdater<QueueContext, QueueEntry>
+ _releasedUpdater =
+ AtomicReferenceFieldUpdater.newUpdater
+ (QueueContext.class, QueueEntry.class, "_releasedEntry");
+
+ public QueueContext(QueueEntry head)
+ {
+ _lastSeenEntry = head;
+ }
+
+ public QueueEntry getLastSeenEntry()
+ {
+ return _lastSeenEntry;
+ }
+}
Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=900135&r1=900134&r2=900135&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Sun Jan 17 13:17:10 2010
@@ -295,7 +295,7 @@
_activeSubscriberCount.incrementAndGet();
subscription.setStateListener(this);
- subscription.setLastSeenEntry(null, _entries.getHead());
+ subscription.setQueueContext(new QueueContext(_entries.getHead()));
if (!isDeleted())
{
@@ -330,12 +330,7 @@
// No longer can the queue have an exclusive consumer
setExclusiveSubscriber(null);
- QueueEntry lastSeen;
-
- while ((lastSeen = subscription.getLastSeenEntry()) != null)
- {
- subscription.setLastSeenEntry(lastSeen, null);
- }
+ subscription.setQueueContext(null);
// auto-delete queues must be deleted if there are no remaining subscribers
@@ -520,68 +515,42 @@
_logger.debug(sub + ": deliverMessage: " + entry.debugIdentity());
}
sub.send(entry);
+
+ setLastSeenEntry(sub,entry);
}
- private boolean subscriptionReadyAndHasInterest(final Subscription sub, final QueueEntry entry)
+ private boolean subscriptionReadyAndHasInterest(final Subscription sub, final QueueEntry entry) throws AMQException
{
+ return sub.hasInterest(entry) && (getNextAvailableEntry(sub) == entry);
+ }
- // We need to move this subscription on, past entries which are already acquired, or deleted or ones it has no
- // interest in.
- QueueEntry node = sub.getLastSeenEntry();
- while (node != null && (node.isAcquired() || node.isDeleted() || !sub.hasInterest(node)))
- {
-
- QueueEntry newNode = _entries.next(node);
- if (newNode != null)
- {
- sub.setLastSeenEntry(node, newNode);
- node = sub.getLastSeenEntry();
- }
- else
- {
- node = null;
- break;
- }
-
- }
+ private void setLastSeenEntry(final Subscription sub, final QueueEntry entry)
+ {
+ QueueContext subContext = (QueueContext) sub.getQueueContext();
+ QueueEntry releasedEntry = subContext._releasedEntry;
- if (node == entry)
+ QueueContext._lastSeenUpdater.set(subContext, entry);
+ if(releasedEntry == entry)
{
- // If the first entry that subscription can process is the one we are trying to deliver to it, then we are
- // good
- return true;
+ QueueContext._releasedUpdater.compareAndSet(subContext, releasedEntry, null);
}
- else
- {
- // Otherwise we should try to update the subscription's last seen entry to the entry we got to, providing
- // no-one else has updated it to something furhter on in the list
- //TODO - check
- //updateLastSeenEntry(sub, entry);
- return false;
- }
-
}
- private void updateLastSeenEntry(final Subscription sub, final QueueEntry entry)
+ private void updateSubRequeueEntry(final Subscription sub, final QueueEntry entry)
{
- QueueEntry node = sub.getLastSeenEntry();
-
- if (node != null && entry.compareTo(node) < 0 && sub.hasInterest(entry))
+ QueueContext subContext = (QueueContext) sub.getQueueContext();
+ if(subContext != null)
{
- do
+ QueueEntry oldEntry;
+
+ while((oldEntry = subContext._releasedEntry) == null || oldEntry.compareTo(entry) > 0)
{
- if (sub.setLastSeenEntry(node, entry))
- {
- return;
- }
- else
+ if(QueueContext._releasedUpdater.compareAndSet(subContext, oldEntry, entry))
{
- node = sub.getLastSeenEntry();
+ break;
}
}
- while (node != null && entry.compareTo(node) < 0);
}
-
}
public void requeue(StoreContext storeContext, QueueEntry entry) throws AMQException
@@ -596,7 +565,7 @@
// we don't make browsers send the same stuff twice
if (!sub.isBrowser())
{
- updateLastSeenEntry(sub, entry);
+ updateSubRequeueEntry(sub, entry);
}
}
@@ -1408,16 +1377,12 @@
private boolean attemptDelivery(Subscription sub) throws AMQException
{
boolean atTail = false;
- boolean advanced = false;
boolean subActive = sub.isActive() && !sub.isSuspended();
if (subActive)
{
- QueueEntry node = moveSubscriptionToNextNode(sub);
- if (_logger.isDebugEnabled())
- {
- _logger.debug(sub + ": attempting Delivery: " + node.debugIdentity());
- }
- if (!(node.isAcquired() || node.isDeleted()))
+ QueueEntry node = getNextAvailableEntry(sub);
+
+ if (node != null && !(node.isAcquired() || node.isDeleted()))
{
if (sub.hasInterest(node))
{
@@ -1430,19 +1395,6 @@
else
{
deliverMessage(sub, node);
-
- if (sub.isBrowser())
- {
- QueueEntry newNode = _entries.next(node);
-
- if (newNode != null)
- {
- advanced = true;
- sub.setLastSeenEntry(node, newNode);
- node = sub.getLastSeenEntry();
- }
-
- }
}
}
@@ -1462,17 +1414,8 @@
node.addStateChangeListener(new QueueEntryListener(sub, node));
}
}
- else
- {
- // this subscription is not interested in this node so we can skip over it
- QueueEntry newNode = _entries.next(node);
- if (newNode != null)
- {
- sub.setLastSeenEntry(node, newNode);
- }
- }
}
- atTail = (_entries.next(node) == null) && !advanced;
+ atTail = (node == null) || (_entries.next(node) == null);
}
return atTail || !subActive;
}
@@ -1484,44 +1427,55 @@
{
SubscriptionList.SubscriptionNode subNode = subscriberIter.getNode();
Subscription sub = subNode.getSubscription();
- moveSubscriptionToNextNode(sub);
+ if(!sub.isBrowser())
+ {
+ getNextAvailableEntry(sub);
+ }
+ else
+ {
+ // TODO
+ }
}
}
- private QueueEntry moveSubscriptionToNextNode(final Subscription sub)
+ private QueueEntry getNextAvailableEntry(final Subscription sub)
throws AMQException
{
- QueueEntry node = sub.getLastSeenEntry();
-
- while (node != null && (node.isAcquired() || node.isDeleted() || node.expired()))
+ QueueContext context = (QueueContext) sub.getQueueContext();
+ if(context != null)
{
- if (!node.isAcquired() && !node.isDeleted() && node.expired())
+ QueueEntry lastSeen = context._lastSeenEntry;
+ QueueEntry releasedNode = context._releasedEntry;
+
+ QueueEntry node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen);
+
+ boolean expired = false;
+ while (node != null && (node.isAcquired() || node.isDeleted() || (expired = node.expired()) || !sub.hasInterest(node)))
{
- if (node.acquire())
+ if (expired)
{
- final StoreContext reapingStoreContext = new StoreContext();
- node.discard(reapingStoreContext);
+ expired = false;
+ if (node.acquire())
+ {
+ node.discard(new StoreContext());
+ }
}
- }
- QueueEntry newNode = _entries.next(node);
- if (newNode != null)
- {
- sub.setLastSeenEntry(node, newNode);
- node = sub.getLastSeenEntry();
- }
- else
- {
- break;
- }
- }
+ if(QueueContext._lastSeenUpdater.compareAndSet(context, lastSeen, node))
+ {
+ QueueContext._releasedUpdater.compareAndSet(context, releasedNode, null);
+ }
- if (_logger.isDebugEnabled())
+ lastSeen = context._lastSeenEntry;
+ releasedNode = context._releasedEntry;
+ node = (releasedNode != null && lastSeen.compareTo(releasedNode)>0) ? releasedNode : _entries.next(lastSeen);
+ }
+ return node;
+ }
+ else
{
- _logger.debug(sub + ": nextNode: " + (node == null ? "null" : node.debugIdentity()));
+ return null;
}
-
- return node;
}
private void processQueue(Runnable runner) throws AMQException
Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java?rev=900135&r1=900134&r2=900135&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java Sun Jan 17 13:17:10 2010
@@ -87,9 +87,9 @@
public State getState();
- QueueEntry getLastSeenEntry();
+ AMQQueue.Context getQueueContext();
- boolean setLastSeenEntry(QueueEntry expected, QueueEntry newValue);
+ void setQueueContext(AMQQueue.Context queueContext);
boolean isActive();
Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java?rev=900135&r1=900134&r2=900135&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java Sun Jan 17 13:17:10 2010
@@ -65,7 +65,7 @@
private final AtomicReference<State> _state = new AtomicReference<State>(State.ACTIVE);
- private final AtomicReference<QueueEntry> _queueContext = new AtomicReference<QueueEntry>(null);
+ private AMQQueue.Context _queueContext;
private final ClientDeliveryMethod _deliveryMethod;
private final RecordDeliveryMethod _recordMethod;
@@ -627,23 +627,14 @@
_stateListener = listener;
}
-
- public QueueEntry getLastSeenEntry()
+ public AMQQueue.Context getQueueContext()
{
- QueueEntry entry = _queueContext.get();
-
- if(_logger.isDebugEnabled())
- {
- _logger.debug(_logActor + ": lastSeenEntry: " + (entry == null ? "null" : entry.debugIdentity()));
- }
-
- return entry;
+ return _queueContext;
}
- public boolean setLastSeenEntry(QueueEntry expected, QueueEntry newvalue)
+ public void setQueueContext(AMQQueue.Context context)
{
- _logger.debug(debugIdentity() + " Setting Last Seen To:" + (newvalue == null ? "nullNV" : newvalue.debugIdentity()));
- return _queueContext.compareAndSet(expected,newvalue);
+ _queueContext = context;
}
Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=900135&r1=900134&r2=900135&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java Sun Jan 17 13:17:10 2010
@@ -186,7 +186,7 @@
// Check sending a message ends up with the subscriber
AMQMessage messageA = createMessage(new Long(24));
_queue.enqueue(null, messageA);
- assertEquals(messageA, _subscription.getLastSeenEntry().getMessage());
+ assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
// Check removing the subscription removes it's information from the queue
_queue.unregisterSubscription(_subscription);
@@ -197,8 +197,8 @@
AMQMessage messageB = createMessage(new Long (25));
_queue.enqueue(null, messageB);
- QueueEntry entry = _subscription.getLastSeenEntry();
- assertNull(entry);
+
+ assertNull(_subscription.getQueueContext());
}
public void testQueueNoSubscriber() throws AMQException, InterruptedException
@@ -207,7 +207,7 @@
_queue.enqueue(null, messageA);
_queue.registerSubscription(_subscription, false);
Thread.sleep(150);
- assertEquals(messageA, _subscription.getLastSeenEntry().getMessage());
+ assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
}
public void testExclusiveConsumer() throws AMQException
@@ -224,7 +224,7 @@
// Check sending a message ends up with the subscriber
AMQMessage messageA = createMessage(new Long(24));
_queue.enqueue(null, messageA);
- assertEquals(messageA, _subscription.getLastSeenEntry().getMessage());
+ assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
// Check we cannot add a second subscriber to the queue
Subscription subB = new MockSubscription();
@@ -273,7 +273,7 @@
Long id = new Long(26);
AMQMessage message = createMessage(id);
_queue.enqueue(null, message);
- QueueEntry entry = _subscription.getLastSeenEntry();
+ QueueEntry entry = _subscription.getQueueContext().getLastSeenEntry();
entry.setRedelivered(true);
_queue.resend(entry, _subscription);
Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java?rev=900135&r1=900134&r2=900135&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java Sun Jan 17 13:17:10 2010
@@ -42,7 +42,7 @@
private AMQShortString tag = new AMQShortString("mocktag");
private AMQQueue queue = null;
private StateListener _listener = null;
- private QueueEntry lastSeen = null;
+ private AMQQueue.Context _queueContext = null;
private State _state = State.ACTIVE;
private ArrayList<QueueEntry> messages = new ArrayList<QueueEntry>();
private final Lock _stateChangeLock = new ReentrantLock();
@@ -81,11 +81,6 @@
return _subscriptionID;
}
- public QueueEntry getLastSeenEntry()
- {
- return lastSeen;
- }
-
public SubscriptionAcquiredState getOwningState()
{
return new QueueEntry.SubscriptionAcquiredState(this);
@@ -155,21 +150,9 @@
public void send(QueueEntry msg) throws AMQException
{
- lastSeen = msg;
messages.add(msg);
}
- public boolean setLastSeenEntry(QueueEntry expected, QueueEntry newValue)
- {
- boolean result = false;
- if (expected != null)
- {
- result = (expected.equals(lastSeen));
- }
- lastSeen = newValue;
- return result;
- }
-
public void setQueue(AMQQueue queue, boolean exclusive)
{
this.queue = queue;
@@ -185,6 +168,16 @@
return _state;
}
+ public AMQQueue.Context getQueueContext()
+ {
+ return _queueContext;
+ }
+
+ public void setQueueContext(AMQQueue.Context queueContext)
+ {
+ _queueContext = queueContext;
+ }
+
public boolean wouldSuspend(QueueEntry msg)
{
return false;
Modified: qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java?rev=900135&r1=900134&r2=900135&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java Sun Jan 17 13:17:10 2010
@@ -47,6 +47,8 @@
protected final boolean _isAutoDelete;
+ private final boolean _browseOnly;
+
private AMQShortString _queueName;
private AMQShortString _routingKey;
@@ -82,6 +84,7 @@
_isExclusive = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_EXCLUSIVE));
_isAutoDelete = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_AUTODELETE));
_isDurable = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_DURABLE));
+ _browseOnly = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_BROWSE));
_queueName = binding.getQueueName() == null ? null : binding.getQueueName();
_routingKey = binding.getRoutingKey() == null ? null : binding.getRoutingKey();
_bindingKeys = binding.getBindingKeys() == null || binding.getBindingKeys().length == 0 ? new AMQShortString[0] : binding.getBindingKeys();
@@ -122,6 +125,12 @@
protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive,
boolean isAutoDelete, AMQShortString queueName, boolean isDurable,AMQShortString[] bindingKeys)
{
+ this (exchangeName, exchangeClass, routingKey, isExclusive,isAutoDelete,queueName,isDurable,bindingKeys, false);
+ }
+
+ protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive,
+ boolean isAutoDelete, AMQShortString queueName, boolean isDurable,AMQShortString[] bindingKeys, boolean browseOnly)
+ {
if ( (ExchangeDefaults.DIRECT_EXCHANGE_CLASS.equals(exchangeClass) ||
ExchangeDefaults.TOPIC_EXCHANGE_CLASS.equals(exchangeClass))
&& routingKey == null)
@@ -144,6 +153,7 @@
_queueName = queueName;
_isDurable = isDurable;
_bindingKeys = bindingKeys == null || bindingKeys.length == 0 ? new AMQShortString[0] : bindingKeys;
+ _browseOnly = browseOnly;
}
public AMQShortString getEncodedName()
@@ -502,4 +512,9 @@
return new AMQAnyDestination(binding);
}
}
+
+ public boolean isBrowseOnly()
+ {
+ return _browseOnly;
+ }
}
Modified: qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=900135&r1=900134&r2=900135&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Sun Jan 17 13:17:10 2010
@@ -902,7 +902,7 @@
checkValidDestination(destination);
return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, (destination instanceof Topic), null, null,
- false, false);
+ ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false);
}
public C createExclusiveConsumer(Destination destination) throws JMSException
@@ -910,7 +910,7 @@
checkValidDestination(destination);
return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, true, null, null,
- false, false);
+ ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false);
}
public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
@@ -918,7 +918,7 @@
checkValidDestination(destination);
return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, (destination instanceof Topic),
- messageSelector, null, false, false);
+ messageSelector, null, ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false);
}
public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal)
@@ -927,7 +927,7 @@
checkValidDestination(destination);
return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, (destination instanceof Topic),
- messageSelector, null, false, false);
+ messageSelector, null, ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false);
}
public MessageConsumer createExclusiveConsumer(Destination destination, String messageSelector, boolean noLocal)
@@ -944,7 +944,7 @@
{
checkValidDestination(destination);
- return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, null, false, false);
+ return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, null, ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false);
}
public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal,
@@ -952,7 +952,7 @@
{
checkValidDestination(destination);
- return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null, false, false);
+ return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null, ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false);
}
public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive,
@@ -960,7 +960,7 @@
{
checkValidDestination(destination);
- return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, rawSelector, false, false);
+ return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, rawSelector, ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false);
}
public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal,
@@ -968,7 +968,7 @@
{
checkValidDestination(destination);
- return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, rawSelector, false,
+ return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, rawSelector, ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()),
false);
}
Modified: qpid/branches/0.5.x-dev/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java?rev=900135&r1=900134&r2=900135&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java Sun Jan 17 13:17:10 2010
@@ -31,6 +31,7 @@
public static final String OPTION_EXCLUSIVE = "exclusive";
public static final String OPTION_AUTODELETE = "autodelete";
public static final String OPTION_DURABLE = "durable";
+ public static final String OPTION_BROWSE = "browse";
public static final String OPTION_CLIENTID = "clientid";
public static final String OPTION_SUBSCRIPTION = "subscription";
public static final String OPTION_ROUTING_KEY = "routingkey";
Modified: qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java?rev=900135&r1=900134&r2=900135&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java Sun Jan 17 13:17:10 2010
@@ -27,6 +27,7 @@
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
import org.apache.qpid.AMQException;
+import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.framing.AMQShortString;
import javax.naming.Context;
@@ -43,6 +44,7 @@
import java.util.Hashtable;
import java.util.Map;
import java.util.HashMap;
+import java.net.URISyntaxException;
import junit.framework.TestCase;
@@ -118,7 +120,7 @@
final Map<String,Object> arguments = new HashMap<String, Object>();
- arguments.put("x-qpid-conflation-key","key");
+ arguments.put("qpid.last_value_queue_key","key");
((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
queue = new org.apache.qpid.client.AMQQueue("amq.direct",QUEUE);
((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
@@ -163,7 +165,7 @@
final Map<String,Object> arguments = new HashMap<String, Object>();
- arguments.put("x-qpid-conflation-key","key");
+ arguments.put("qpid.last_value_queue_key","key");
((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), false, false, false, arguments);
queue = new org.apache.qpid.client.AMQQueue("amq.direct",QUEUE);
((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
@@ -239,7 +241,7 @@
final Map<String,Object> arguments = new HashMap<String, Object>();
- arguments.put("x-qpid-conflation-key","key");
+ arguments.put("qpid.last_value_queue_key","key");
((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), false, false, false, arguments);
queue = new org.apache.qpid.client.AMQQueue("amq.direct",QUEUE);
((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
@@ -310,6 +312,67 @@
}
+ public void testConflationBrowser() throws JMSException, NamingException, AMQException, InterruptedException, URISyntaxException
+ {
+ consumerConnection = ((ConnectionFactory) context.lookup("connection")).createConnection();
+ consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+
+ final Map<String,Object> arguments = new HashMap<String, Object>();
+ arguments.put("qpid.last_value_queue_key","key");
+ ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
+ queue = new org.apache.qpid.client.AMQQueue("amq.direct",QUEUE);
+ ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+ producer = producerSession.createProducer(queue);
+
+ for (int msg = 0; msg < MSG_COUNT; msg++)
+ {
+ producer.send(nextMessage(msg, false, producerSession, producer));
+ if(msg%10000 == 0)
+ {
+ System.err.println("Sent... " + msg);
+ Thread.sleep(1000);
+ }
+
+ }
+
+ ((AMQSession)producerSession).sync();
+
+ //
+ AMQBindingURL url = new AMQBindingURL("direct://amq.direct//"+QUEUE+"?browse='true'");
+ AMQQueue browseQueue = new AMQQueue(url);
+
+ consumer = consumerSession.createConsumer(browseQueue);
+ consumerConnection.start();
+ Message received;
+ int receivedCount = 0;
+ while((received = consumer.receive(1000))!=null)
+ {
+ receivedCount++;
+ System.err.println("Message: " + received.getIntProperty("msg") + " Conflation Key: " + received.getStringProperty("key"));
+ }
+
+ System.err.println("Received Count: " + receivedCount);
+
+ producer.send(nextMessage(MSG_COUNT, false, producerSession, producer));
+
+ ((AMQSession)producerSession).sync();
+
+ while((received = consumer.receive(5000))!=null)
+ {
+ receivedCount++;
+ System.err.println("Message: " + received.getIntProperty("msg") + " Conflation Key: " + received.getStringProperty("key"));
+ }
+
+ System.err.println("Received Count: " + receivedCount);
+
+ producer.close();
+ producerSession.close();
+ producerConnection.close();
+
+
+
+ }
Modified: qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java?rev=900135&r1=900134&r2=900135&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java Sun Jan 17 13:17:10 2010
@@ -117,6 +117,16 @@
return null; //To change body of implemented methods use File | Settings | File Templates.
}
+ public AMQQueue.Context getQueueContext()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void setQueueContext(AMQQueue.Context queueContext)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public QueueEntry getLastSeenEntry()
{
return null; //To change body of implemented methods use File | Settings | File Templates.
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org